大文件分片上传、秒传、断点续传

2023-10-19 10:19 姚铭强 681

什么是大文件分片上传

大文件分片上传是一种将大文件分割成多个小文件进行上传的技术。通常情况下,由于网络传输的限制,上传大文件可能会遇到诸如网络不稳定、上传速度慢等问题。为了解决这些问题,可以将大文件分割成多个小文件,分别上传,然后在服务器端将这些小文件合并成完整的大文件。

大文件分片上传的意义

  1. 大文件传输过程中,网络连接可能不稳定,导致传输中断。通过将大文件分成小块进行上传,可以减少单个传输过程中的数据量,降低传输中断的风险。
  2. 大文件传输需要较长的时间,而且可能会占用大量的网络带宽。通过将大文件分成小块进行并行上传,可以提高传输效率,减少传输时间。
  3. 如果在上传大文件时发生中断,传统的上传方式需要重新上传整个文件。而通过分片上传,可以记录已经成功上传的分片,当传输中断后再次上传时,只需要上传未完成的分片,从而实现断点续传,节省时间和带宽。
  4. 大文件传输可能会占用服务器的大量资源,包括存储空间和处理能力。通过分片上传,可以将大文件分散存储在多个服务器上,提高服务器资源的利用率。

文件分片上传原理图

操作步骤

一、前端对文件进行读取

 const files = e.target.files;
   if (!files) return;
   //读取文件
   // console.log(files[0]);
   filName.value = files[0].name;
 
   //对文件进行分片操作
   const chunks = createChunks(files[0]);
   // console.log(chunks);

二、前端对文件进行分片操作

通过使用slice函数对文件进行分片,每片的大小为1M,注意文件的大小单位为byte

 const CHUNK_SIZE = 1024 * 1024; //定义分片大小 1M
 // 1M=1024KB=1024*1024B
 //文件分片函数
 const createChunks = (file) => {
   let cur = 0; //分片起始位置
   let chunks = []; //定义数组接收分片
   while (cur < file.size) {
     const blob = file.slice(cur, cur + CHUNK_SIZE); //slice为字节的起始结束位置
     chunks.push(blob);
     cur += CHUNK_SIZE;
   }
   return chunks;
 };

三、前端对文件进行hash运算

将所有分片进行遍历,取第一个和最后一个分片的全部数据,中间切片的前面两个字节,中间两个字节,最后两个字节进行储存,然后将其变为blob数据通过spark-md5库计算hash值

 const calculateHash = (chunks) => {
   return new Promise((resolve) => {
     //1.第一个和最后一个切片全部参与计算
     //2.中间的切片只计算前面两个字节,中间两个字节,最后两个字节
     const targets = []; //存储所有参与计算的切片
     const spark = new SparkMD5.ArrayBuffer();
     const fileReader = new FileReader();
 
     chunks.forEach((chunk, index) => {
       if (index == 0 || index == chunks.length - 1) {
         //1.第一个和最后一个切片全部参与计算
         targets.push(chunk);
       } else {
         //2.中间的切片只计算前面两个字节,中间两个字节,最后两个字节
         targets.push(chunk.slice(0, 2)); //前面两个字节
         targets.push(chunk.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2)); //中间两个字节
         targets.push(chunk.slice(CHUNK_SIZE - 2, CHUNK_SIZE)); //最后两个字节
       }
     });
 
     fileReader.readAsArrayBuffer(new Blob(targets));
     fileReader.onload = (e) => {
       spark.append(e.target.result);
       //拿到计算出来的hash值
       // console.log("hash", spark.end());
       resolve(spark.end());
     };
   });
 };

四、前端对文件进行上传

新建FormData,加入hash值,hash值加每片的索引,和每片的文件,新建请求连接池,定义最大并发数为6,当每次请求成功后,进行连接池的销毁和加入。

 //分片上传
 const upLoadChunks = async (chunks,existChunks) => {
   //浏览器的并发请求数是有限的
   const data = chunks.map((chunk, index) => {
     return {
       fileHash: fileHash.value,
       chunkHash: fileHash.value + "-" + index,
       chunk,
     };
   });
 
   //将对象数组转换为formData数组
   const formDatas = data
   //对已上传的切片进行过滤
   .filter((item)=>!existChunks.includes(item.chunkHash))
   .map((item) => {
     const formData = new FormData();
 
     formData.append("fileHash", item.fileHash);
     formData.append("chunkHash", item.chunkHash);
     formData.append("chunk", item.chunk);
 
     return formData;
   });
 
   // console.log(formDatas);
 
   const max = 6; //定义最大并发请求数
   let index = 0; //定义下标,当前上传第几个
   const taskPool = []; //给每一个请求建立请求池
   while (index < formDatas.length) {
     //发起请求  使用fetch方法发送POST请求,将文件数据作为请求体发送到服务器,并将返回的Promise对象赋值给task变量。
     const task = fetch("http://127.0.0.1:3000/upload", {
       method: "POST",
       body: formDatas[index],
     });
 
     taskPool.splice(taskPool.findIndex((item) => item == task));
     taskPool.push(task);
     //如果请求池的数量等于最大请求数
     if (taskPool.length == max) {
       await Promise.race(taskPool); //等待任意一个promise对象的状态变为resolve或reject
     }
     index++;
   }
   await Promise.all(taskPool);
 
   //发起和并请求(通知服务器合并文件)
   mergeRequest();
 };

五、后端接收文件

创建临时文件目录,以文件生成的hash值命名,将上传的文件存放到临时目录中,并返回成功的状态。

 app.post('/upload', function (req, res) {
 
     const form = new multiparty.Form()
     form.parse(req, async (err, fields, files) => {
         if (err) {
             res.status(401).json({
                 ok: false,
                 msg: '上传失败,请重新上传'
             })
             return
         }
 
         const fileHash = fields['fileHash'][0]
         const chunkHash = fields['chunkHash'][0]
 
         //创建临时存放目录fileHash命名
         const chunkPath = path.resolve(UPLOAD_DIR, fileHash)
         if (!fse.existsSync(chunkPath)) {
             await fse.mkdir(chunkPath)
         }
         //将切片放入文件夹里面chunkHash命名
         const oldPath = files['chunk'][0]['path']//系统临时存放目录文件
 
         await fse.move(oldPath, path.resolve(chunkPath, chunkHash))
 
         res.status(200).json({
             ok: true,
             msg: '上传成功'
         })
     })
 })

六、前端发起文件合并请求

 //对分片文件进行合并
 const mergeRequest = () => {
   fetch("http://127.0.0.1:3000/merge", {
     method: "POST",
     headers: {
       "content-type": "application/json",
     },
     body: JSON.stringify({
       fileHash: fileHash.value,
       filName: filName.value,
       size: CHUNK_SIZE,
     }),
   }).then((res) => {
     alert("合并成功了!!!");
   });
 };

七、后端对文件进行合并

查看文件是否存在,如果文件存在就没有必要合并就返回合并成功,节省上传带宽。读取文件目录,对文件目录进行排序操作。排序后对文件进行合并操作

 app.post('/merge', async function (req, res) {
     const { fileHash, filName, size } = req.body
     // console.log(fileHash, filName, size)
 
     //如果已经存在该文件,就没必要合并了
     //完整的文件的路径
     const filePath = path.resolve(UPLOAD_DIR, fileHash + extractExt(filName))
 
     //如果已经存在该文件,就没必要合并了
     if (fse.existsSync(filePath)) {
         res.status(200).json({
             ok: true,
             msg: '合并成功'
         })
         return
     }
 
     //如果不存在该文件,采取合并
     const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
     if (!fse.existsSync(chunkDir)) {
         res.status(401).json({
             ok: false,
             msg: '合并失败,请重新上传'
         })
         return
     }
 
     //合并操作
     const chunkPaths = await fse.readdir(chunkDir)
     // console.log(chunkPaths,chunkPaths.length)
     //进行排序操作
     chunkPaths.sort((a, b) => {
         return a.split('-')[1] - b.split('-')[1]
     })
     console.log(chunkPaths)
     const list = chunkPaths.map((chunkName, index) => {
         return new Promise(resolve => {
             const chunkPath = path.resolve(chunkDir, chunkName)
             const readStream = fse.createReadStream(chunkPath)
             const writeStream = fse.createWriteStream(filePath, {
                 start: index * size,
                 end: (index + 1) * size
             })
             //读取完成后移出切片
             readStream.on('end', async () => {
                 await fse.unlink(chunkPath)
                 resolve()
             })
             readStream.pipe(writeStream)
         })
     })
 
     await Promise.all(list)
     await fse.remove(chunkDir)//删除创建的临时文件夹
 
     res.status(200).json({
         ok: true,
         msg: '合并成功'
     })
 })

八、实现秒传操作

在发起合并请求之前发起妙传请求,将hash值传到后端,如果后端存在对应的文件则,返回不用上传的字段,前端接收到字段,不再上传,实现秒传的操作

 app.post('/verify', async function (req, res) {
     const { fileHash, filName } = req.body
     // console.log(fileHash, filName);
 
     const filePath = path.resolve(UPLOAD_DIR, fileHash + extractExt(filName))
 
     //返回服务器已经上传成功的切片
     const chunkDir=path.join(UPLOAD_DIR,fileHash)
     let chunkPaths=[]
     //如果存在对应的临时文件夹才去读取
     if(fse.existsSync(chunkDir)){
         chunkPaths = await fse.readdir(chunkDir)
         console.log(chunkPaths)
     }
 
     if (fse.existsSync(filePath)) {
         //如果存在就不用上传
         res.status(200).json({
             ok: true,
             shouldUpload: false
         })
     } else {
         //如果不存在重新上传
         res.status(200).json({
             ok: true,
             shouldUpload: true,
             existChunks:chunkPaths
         })
     }
 
 })

九、实现断点续传的操作

在第八步的代码中获取已上传的文件切片,当需要上传文件时,返回已上传文件的列表,前端在上传文件时,过滤已完成的文件

后端

前端


全部代码

前端

 <template>
   <div>
     <h1>大文件上传</h1>
     <input type="file" @change="handleUpload" />
   </div>
 </template>
 
 <script setup>
 import SparkMD5 from "spark-md5"; //引入hash计算库
 import { ref } from "vue";
 //读取本地文件的逻辑
 const handleUpload = async (e) => {
   // console.log(e.target.files);//伪数组
   const files = e.target.files;
   if (!files) return;
   //读取文件
   // console.log(files[0]);
   filName.value = files[0].name;
 
   //对文件进行分片操作
   const chunks = createChunks(files[0]);
   // console.log(chunks);
 
   //hash计算
   const hash = await calculateHash(chunks);
   // console.log(hash);
   fileHash.value = hash;
 
   //校验hash值实现秒传
   const data = await verify();
   if(!data.shouldUpload){
     alert('秒传:上传成功')
     return
   }
 
   //分片上传
   upLoadChunks(chunks,data.existChunks);
 };
 
 const CHUNK_SIZE = 1024 * 1024; //定义分片大小 1M
 const fileHash = ref(""); //文件hash值
 const filName = ref(""); //文件名
 
 // 1M=1024KB=1024*1024B
 //文件分片函数
 const createChunks = (file) => {
   let cur = 0; //分片起始位置
   let chunks = []; //定义数组接收分片
   while (cur < file.size) {
     const blob = file.slice(cur, cur + CHUNK_SIZE); //slice为字节的起始结束位置
     chunks.push(blob);
     cur += CHUNK_SIZE;
   }
   return chunks;
 };
 
 //hash计算函数
 const calculateHash = (chunks) => {
   return new Promise((resolve) => {
     //1.第一个和最后一个切片全部参与计算
     //2.中间的切片只计算前面两个字节,中间两个字节,最后两个字节
     const targets = []; //存储所有参与计算的切片
     const spark = new SparkMD5.ArrayBuffer();
     const fileReader = new FileReader();
 
     chunks.forEach((chunk, index) => {
       if (index == 0 || index == chunks.length - 1) {
         //1.第一个和最后一个切片全部参与计算
         targets.push(chunk);
       } else {
         //2.中间的切片只计算前面两个字节,中间两个字节,最后两个字节
         targets.push(chunk.slice(0, 2)); //前面两个字节
         targets.push(chunk.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2)); //中间两个字节
         targets.push(chunk.slice(CHUNK_SIZE - 2, CHUNK_SIZE)); //最后两个字节
       }
     });
 
     fileReader.readAsArrayBuffer(new Blob(targets));
     fileReader.onload = (e) => {
       spark.append(e.target.result);
       //拿到计算出来的hash值
       // console.log("hash", spark.end());
       resolve(spark.end());
     };
   });
 };
 
 //分片上传
 const upLoadChunks = async (chunks,existChunks) => {
   //浏览器的并发请求数是有限的
   const data = chunks.map((chunk, index) => {
     return {
       fileHash: fileHash.value,
       chunkHash: fileHash.value + "-" + index,
       chunk,
     };
   });
 
   //将对象数组转换为formData数组
   const formDatas = data
   //对已上传的切片进行过滤
   .filter((item)=>!existChunks.includes(item.chunkHash))
   .map((item) => {
     const formData = new FormData();
 
     formData.append("fileHash", item.fileHash);
     formData.append("chunkHash", item.chunkHash);
     formData.append("chunk", item.chunk);
 
     return formData;
   });
 
   // console.log(formDatas);
 
   const max = 6; //定义最大并发请求数
   let index = 0; //定义下标,当前上传第几个
   const taskPool = []; //给每一个请求建立请求池
   while (index < formDatas.length) {
     //发起请求  使用fetch方法发送POST请求,将文件数据作为请求体发送到服务器,并将返回的Promise对象赋值给task变量。
     const task = fetch("http://127.0.0.1:3000/upload", {
       method: "POST",
       body: formDatas[index],
     });
 
     taskPool.splice(taskPool.findIndex((item) => item == task));
     taskPool.push(task);
     //如果请求池的数量等于最大请求数
     if (taskPool.length == max) {
       await Promise.race(taskPool); //等待任意一个promise对象的状态变为resolve或reject
     }
     index++;
   }
   await Promise.all(taskPool);
 
   //发起和并请求(通知服务器合并文件)
   mergeRequest();
 };
 //对分片文件进行合并
 const mergeRequest = () => {
   fetch("http://127.0.0.1:3000/merge", {
     method: "POST",
     headers: {
       "content-type": "application/json",
     },
     body: JSON.stringify({
       fileHash: fileHash.value,
       filName: filName.value,
       size: CHUNK_SIZE,
     }),
   }).then((res) => {
     alert("合并成功了!!!");
   });
 };
 // 校验hash值实现秒传
 const verify = () => {
   return fetch("http://127.0.0.1:3000/verify", {
     method: "POST",
     headers: {
       "content-type": "application/json",
     },
     body: JSON.stringify({
       fileHash: fileHash.value,
       filName: filName.value,
     }),
   })
     .then((res) => {
       return res.json();
     })
 };
 </script>
 
 <style lang="scss" scoped>
 </style>

后端

 const express = require('express')
 const path = require('path')
 const multiparty = require('multiparty')
 const fse = require('fs-extra')
 const cors = require('cors')
 const bodyParser = require('body-parser')
 
 const app = express()
 app.use(bodyParser.json())
 app.use(cors())
 
 //提取文件后缀名
 const extractExt = filename => {
     return filename.slice(filename.lastIndexOf('.'), filename.length)
 }
 
 const UPLOAD_DIR = path.resolve(__dirname, 'uploads')
 
 app.post('/upload', function (req, res) {
 
     const form = new multiparty.Form()
     form.parse(req, async (err, fields, files) => {
         if (err) {
             res.status(401).json({
                 ok: false,
                 msg: '上传失败,请重新上传'
             })
             return
         }
 
         const fileHash = fields['fileHash'][0]
         const chunkHash = fields['chunkHash'][0]
 
         //创建临时存放目录fileHash命名
         const chunkPath = path.resolve(UPLOAD_DIR, fileHash)
         if (!fse.existsSync(chunkPath)) {
             await fse.mkdir(chunkPath)
         }
         //将切片放入文件夹里面chunkHash命名
         const oldPath = files['chunk'][0]['path']//系统临时存放目录文件
 
         await fse.move(oldPath, path.resolve(chunkPath, chunkHash))
 
         res.status(200).json({
             ok: true,
             msg: '上传成功'
         })
     })
 })
 
 app.post('/merge', async function (req, res) {
     const { fileHash, filName, size } = req.body
     // console.log(fileHash, filName, size)
 
     //如果已经存在该文件,就没必要合并了
     //完整的文件的路径
     const filePath = path.resolve(UPLOAD_DIR, fileHash + extractExt(filName))
 
     //如果已经存在该文件,就没必要合并了
     if (fse.existsSync(filePath)) {
         res.status(200).json({
             ok: true,
             msg: '合并成功'
         })
         return
     }
 
     //如果不存在该文件,采取合并
     const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
     if (!fse.existsSync(chunkDir)) {
         res.status(401).json({
             ok: false,
             msg: '合并失败,请重新上传'
         })
         return
     }
 
     //合并操作
     const chunkPaths = await fse.readdir(chunkDir)
     // console.log(chunkPaths,chunkPaths.length)
     //进行排序操作
     chunkPaths.sort((a, b) => {
         return a.split('-')[1] - b.split('-')[1]
     })
     console.log(chunkPaths)
     const list = chunkPaths.map((chunkName, index) => {
         return new Promise(resolve => {
             const chunkPath = path.resolve(chunkDir, chunkName)
             const readStream = fse.createReadStream(chunkPath)
             const writeStream = fse.createWriteStream(filePath, {
                 start: index * size,
                 end: (index + 1) * size
             })
             //读取完成后移出切片
             readStream.on('end', async () => {
                 await fse.unlink(chunkPath)
                 resolve()
             })
             readStream.pipe(writeStream)
         })
     })
 
     await Promise.all(list)
     await fse.remove(chunkDir)//删除创建的临时文件夹
 
     res.status(200).json({
         ok: true,
         msg: '合并成功'
     })
 })
 
 app.post('/verify', async function (req, res) {
     const { fileHash, filName } = req.body
     // console.log(fileHash, filName);
 
     const filePath = path.resolve(UPLOAD_DIR, fileHash + extractExt(filName))
 
     //返回服务器已经上传成功的切片
     const chunkDir=path.join(UPLOAD_DIR,fileHash)
     let chunkPaths=[]
     //如果存在对应的临时文件夹才去读取
     if(fse.existsSync(chunkDir)){
         chunkPaths = await fse.readdir(chunkDir)
         console.log(chunkPaths)
     }
 
     if (fse.existsSync(filePath)) {
         //如果存在就不用上传
         res.status(200).json({
             ok: true,
             shouldUpload: false
         })
     } else {
         //如果不存在重新上传
         res.status(200).json({
             ok: true,
             shouldUpload: true,
             existChunks:chunkPaths
         })
     }
 
 })
 
 app.listen(3000, () => {
     console.log('Server is running on port 3000')
 })