什么是大文件分片上传
大文件分片上传是一种将大文件分割成多个小文件进行上传的技术。通常情况下,由于网络传输的限制,上传大文件可能会遇到诸如网络不稳定、上传速度慢等问题。为了解决这些问题,可以将大文件分割成多个小文件,分别上传,然后在服务器端将这些小文件合并成完整的大文件。
大文件分片上传的意义
- 大文件传输过程中,网络连接可能不稳定,导致传输中断。通过将大文件分成小块进行上传,可以减少单个传输过程中的数据量,降低传输中断的风险。
- 大文件传输需要较长的时间,而且可能会占用大量的网络带宽。通过将大文件分成小块进行并行上传,可以提高传输效率,减少传输时间。
- 如果在上传大文件时发生中断,传统的上传方式需要重新上传整个文件。而通过分片上传,可以记录已经成功上传的分片,当传输中断后再次上传时,只需要上传未完成的分片,从而实现断点续传,节省时间和带宽。
- 大文件传输可能会占用服务器的大量资源,包括存储空间和处理能力。通过分片上传,可以将大文件分散存储在多个服务器上,提高服务器资源的利用率。
文件分片上传原理图
操作步骤
一、前端对文件进行读取
const files = e.target.files;
if (!files) return;
//读取文件
// console.log(files[0]);
filName.value = files[0].name;
//对文件进行分片操作
const chunks = createChunks(files[0]);
// console.log(chunks);
二、前端对文件进行分片操作
通过使用slice函数对文件进行分片,每片的大小为1M,注意文件的大小单位为byte
const CHUNK_SIZE = 1024 * 1024; //定义分片大小 1M
// 1M=1024KB=1024*1024B
//文件分片函数
const createChunks = (file) => {
let cur = 0; //分片起始位置
let chunks = []; //定义数组接收分片
while (cur < file.size) {
const blob = file.slice(cur, cur + CHUNK_SIZE); //slice为字节的起始结束位置
chunks.push(blob);
cur += CHUNK_SIZE;
}
return chunks;
};
三、前端对文件进行hash运算
将所有分片进行遍历,取第一个和最后一个分片的全部数据,中间切片的前面两个字节,中间两个字节,最后两个字节进行储存,然后将其变为blob数据通过spark-md5库计算hash值
const calculateHash = (chunks) => {
return new Promise((resolve) => {
//1.第一个和最后一个切片全部参与计算
//2.中间的切片只计算前面两个字节,中间两个字节,最后两个字节
const targets = []; //存储所有参与计算的切片
const spark = new SparkMD5.ArrayBuffer();
const fileReader = new FileReader();
chunks.forEach((chunk, index) => {
if (index == 0 || index == chunks.length - 1) {
//1.第一个和最后一个切片全部参与计算
targets.push(chunk);
} else {
//2.中间的切片只计算前面两个字节,中间两个字节,最后两个字节
targets.push(chunk.slice(0, 2)); //前面两个字节
targets.push(chunk.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2)); //中间两个字节
targets.push(chunk.slice(CHUNK_SIZE - 2, CHUNK_SIZE)); //最后两个字节
}
});
fileReader.readAsArrayBuffer(new Blob(targets));
fileReader.onload = (e) => {
spark.append(e.target.result);
//拿到计算出来的hash值
// console.log("hash", spark.end());
resolve(spark.end());
};
});
};
四、前端对文件进行上传
新建FormData,加入hash值,hash值加每片的索引,和每片的文件,新建请求连接池,定义最大并发数为6,当每次请求成功后,进行连接池的销毁和加入。
//分片上传
const upLoadChunks = async (chunks,existChunks) => {
//浏览器的并发请求数是有限的
const data = chunks.map((chunk, index) => {
return {
fileHash: fileHash.value,
chunkHash: fileHash.value + "-" + index,
chunk,
};
});
//将对象数组转换为formData数组
const formDatas = data
//对已上传的切片进行过滤
.filter((item)=>!existChunks.includes(item.chunkHash))
.map((item) => {
const formData = new FormData();
formData.append("fileHash", item.fileHash);
formData.append("chunkHash", item.chunkHash);
formData.append("chunk", item.chunk);
return formData;
});
// console.log(formDatas);
const max = 6; //定义最大并发请求数
let index = 0; //定义下标,当前上传第几个
const taskPool = []; //给每一个请求建立请求池
while (index < formDatas.length) {
//发起请求 使用fetch方法发送POST请求,将文件数据作为请求体发送到服务器,并将返回的Promise对象赋值给task变量。
const task = fetch("http://127.0.0.1:3000/upload", {
method: "POST",
body: formDatas[index],
});
taskPool.splice(taskPool.findIndex((item) => item == task));
taskPool.push(task);
//如果请求池的数量等于最大请求数
if (taskPool.length == max) {
await Promise.race(taskPool); //等待任意一个promise对象的状态变为resolve或reject
}
index++;
}
await Promise.all(taskPool);
//发起和并请求(通知服务器合并文件)
mergeRequest();
};
五、后端接收文件
创建临时文件目录,以文件生成的hash值命名,将上传的文件存放到临时目录中,并返回成功的状态。
app.post('/upload', function (req, res) {
const form = new multiparty.Form()
form.parse(req, async (err, fields, files) => {
if (err) {
res.status(401).json({
ok: false,
msg: '上传失败,请重新上传'
})
return
}
const fileHash = fields['fileHash'][0]
const chunkHash = fields['chunkHash'][0]
//创建临时存放目录fileHash命名
const chunkPath = path.resolve(UPLOAD_DIR, fileHash)
if (!fse.existsSync(chunkPath)) {
await fse.mkdir(chunkPath)
}
//将切片放入文件夹里面chunkHash命名
const oldPath = files['chunk'][0]['path']//系统临时存放目录文件
await fse.move(oldPath, path.resolve(chunkPath, chunkHash))
res.status(200).json({
ok: true,
msg: '上传成功'
})
})
})
六、前端发起文件合并请求
//对分片文件进行合并
const mergeRequest = () => {
fetch("http://127.0.0.1:3000/merge", {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify({
fileHash: fileHash.value,
filName: filName.value,
size: CHUNK_SIZE,
}),
}).then((res) => {
alert("合并成功了!!!");
});
};
七、后端对文件进行合并
查看文件是否存在,如果文件存在就没有必要合并就返回合并成功,节省上传带宽。读取文件目录,对文件目录进行排序操作。排序后对文件进行合并操作
app.post('/merge', async function (req, res) {
const { fileHash, filName, size } = req.body
// console.log(fileHash, filName, size)
//如果已经存在该文件,就没必要合并了
//完整的文件的路径
const filePath = path.resolve(UPLOAD_DIR, fileHash + extractExt(filName))
//如果已经存在该文件,就没必要合并了
if (fse.existsSync(filePath)) {
res.status(200).json({
ok: true,
msg: '合并成功'
})
return
}
//如果不存在该文件,采取合并
const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
if (!fse.existsSync(chunkDir)) {
res.status(401).json({
ok: false,
msg: '合并失败,请重新上传'
})
return
}
//合并操作
const chunkPaths = await fse.readdir(chunkDir)
// console.log(chunkPaths,chunkPaths.length)
//进行排序操作
chunkPaths.sort((a, b) => {
return a.split('-')[1] - b.split('-')[1]
})
console.log(chunkPaths)
const list = chunkPaths.map((chunkName, index) => {
return new Promise(resolve => {
const chunkPath = path.resolve(chunkDir, chunkName)
const readStream = fse.createReadStream(chunkPath)
const writeStream = fse.createWriteStream(filePath, {
start: index * size,
end: (index + 1) * size
})
//读取完成后移出切片
readStream.on('end', async () => {
await fse.unlink(chunkPath)
resolve()
})
readStream.pipe(writeStream)
})
})
await Promise.all(list)
await fse.remove(chunkDir)//删除创建的临时文件夹
res.status(200).json({
ok: true,
msg: '合并成功'
})
})
八、实现秒传操作
在发起合并请求之前发起妙传请求,将hash值传到后端,如果后端存在对应的文件则,返回不用上传的字段,前端接收到字段,不再上传,实现秒传的操作
app.post('/verify', async function (req, res) {
const { fileHash, filName } = req.body
// console.log(fileHash, filName);
const filePath = path.resolve(UPLOAD_DIR, fileHash + extractExt(filName))
//返回服务器已经上传成功的切片
const chunkDir=path.join(UPLOAD_DIR,fileHash)
let chunkPaths=[]
//如果存在对应的临时文件夹才去读取
if(fse.existsSync(chunkDir)){
chunkPaths = await fse.readdir(chunkDir)
console.log(chunkPaths)
}
if (fse.existsSync(filePath)) {
//如果存在就不用上传
res.status(200).json({
ok: true,
shouldUpload: false
})
} else {
//如果不存在重新上传
res.status(200).json({
ok: true,
shouldUpload: true,
existChunks:chunkPaths
})
}
})
九、实现断点续传的操作
在第八步的代码中获取已上传的文件切片,当需要上传文件时,返回已上传文件的列表,前端在上传文件时,过滤已完成的文件
后端
前端
全部代码
前端
<template>
<div>
<h1>大文件上传</h1>
<input type="file" @change="handleUpload" />
</div>
</template>
<script setup>
import SparkMD5 from "spark-md5"; //引入hash计算库
import { ref } from "vue";
//读取本地文件的逻辑
const handleUpload = async (e) => {
// console.log(e.target.files);//伪数组
const files = e.target.files;
if (!files) return;
//读取文件
// console.log(files[0]);
filName.value = files[0].name;
//对文件进行分片操作
const chunks = createChunks(files[0]);
// console.log(chunks);
//hash计算
const hash = await calculateHash(chunks);
// console.log(hash);
fileHash.value = hash;
//校验hash值实现秒传
const data = await verify();
if(!data.shouldUpload){
alert('秒传:上传成功')
return
}
//分片上传
upLoadChunks(chunks,data.existChunks);
};
const CHUNK_SIZE = 1024 * 1024; //定义分片大小 1M
const fileHash = ref(""); //文件hash值
const filName = ref(""); //文件名
// 1M=1024KB=1024*1024B
//文件分片函数
const createChunks = (file) => {
let cur = 0; //分片起始位置
let chunks = []; //定义数组接收分片
while (cur < file.size) {
const blob = file.slice(cur, cur + CHUNK_SIZE); //slice为字节的起始结束位置
chunks.push(blob);
cur += CHUNK_SIZE;
}
return chunks;
};
//hash计算函数
const calculateHash = (chunks) => {
return new Promise((resolve) => {
//1.第一个和最后一个切片全部参与计算
//2.中间的切片只计算前面两个字节,中间两个字节,最后两个字节
const targets = []; //存储所有参与计算的切片
const spark = new SparkMD5.ArrayBuffer();
const fileReader = new FileReader();
chunks.forEach((chunk, index) => {
if (index == 0 || index == chunks.length - 1) {
//1.第一个和最后一个切片全部参与计算
targets.push(chunk);
} else {
//2.中间的切片只计算前面两个字节,中间两个字节,最后两个字节
targets.push(chunk.slice(0, 2)); //前面两个字节
targets.push(chunk.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2)); //中间两个字节
targets.push(chunk.slice(CHUNK_SIZE - 2, CHUNK_SIZE)); //最后两个字节
}
});
fileReader.readAsArrayBuffer(new Blob(targets));
fileReader.onload = (e) => {
spark.append(e.target.result);
//拿到计算出来的hash值
// console.log("hash", spark.end());
resolve(spark.end());
};
});
};
//分片上传
const upLoadChunks = async (chunks,existChunks) => {
//浏览器的并发请求数是有限的
const data = chunks.map((chunk, index) => {
return {
fileHash: fileHash.value,
chunkHash: fileHash.value + "-" + index,
chunk,
};
});
//将对象数组转换为formData数组
const formDatas = data
//对已上传的切片进行过滤
.filter((item)=>!existChunks.includes(item.chunkHash))
.map((item) => {
const formData = new FormData();
formData.append("fileHash", item.fileHash);
formData.append("chunkHash", item.chunkHash);
formData.append("chunk", item.chunk);
return formData;
});
// console.log(formDatas);
const max = 6; //定义最大并发请求数
let index = 0; //定义下标,当前上传第几个
const taskPool = []; //给每一个请求建立请求池
while (index < formDatas.length) {
//发起请求 使用fetch方法发送POST请求,将文件数据作为请求体发送到服务器,并将返回的Promise对象赋值给task变量。
const task = fetch("http://127.0.0.1:3000/upload", {
method: "POST",
body: formDatas[index],
});
taskPool.splice(taskPool.findIndex((item) => item == task));
taskPool.push(task);
//如果请求池的数量等于最大请求数
if (taskPool.length == max) {
await Promise.race(taskPool); //等待任意一个promise对象的状态变为resolve或reject
}
index++;
}
await Promise.all(taskPool);
//发起和并请求(通知服务器合并文件)
mergeRequest();
};
//对分片文件进行合并
const mergeRequest = () => {
fetch("http://127.0.0.1:3000/merge", {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify({
fileHash: fileHash.value,
filName: filName.value,
size: CHUNK_SIZE,
}),
}).then((res) => {
alert("合并成功了!!!");
});
};
// 校验hash值实现秒传
const verify = () => {
return fetch("http://127.0.0.1:3000/verify", {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify({
fileHash: fileHash.value,
filName: filName.value,
}),
})
.then((res) => {
return res.json();
})
};
</script>
<style lang="scss" scoped>
</style>
后端
const express = require('express')
const path = require('path')
const multiparty = require('multiparty')
const fse = require('fs-extra')
const cors = require('cors')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json())
app.use(cors())
//提取文件后缀名
const extractExt = filename => {
return filename.slice(filename.lastIndexOf('.'), filename.length)
}
const UPLOAD_DIR = path.resolve(__dirname, 'uploads')
app.post('/upload', function (req, res) {
const form = new multiparty.Form()
form.parse(req, async (err, fields, files) => {
if (err) {
res.status(401).json({
ok: false,
msg: '上传失败,请重新上传'
})
return
}
const fileHash = fields['fileHash'][0]
const chunkHash = fields['chunkHash'][0]
//创建临时存放目录fileHash命名
const chunkPath = path.resolve(UPLOAD_DIR, fileHash)
if (!fse.existsSync(chunkPath)) {
await fse.mkdir(chunkPath)
}
//将切片放入文件夹里面chunkHash命名
const oldPath = files['chunk'][0]['path']//系统临时存放目录文件
await fse.move(oldPath, path.resolve(chunkPath, chunkHash))
res.status(200).json({
ok: true,
msg: '上传成功'
})
})
})
app.post('/merge', async function (req, res) {
const { fileHash, filName, size } = req.body
// console.log(fileHash, filName, size)
//如果已经存在该文件,就没必要合并了
//完整的文件的路径
const filePath = path.resolve(UPLOAD_DIR, fileHash + extractExt(filName))
//如果已经存在该文件,就没必要合并了
if (fse.existsSync(filePath)) {
res.status(200).json({
ok: true,
msg: '合并成功'
})
return
}
//如果不存在该文件,采取合并
const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
if (!fse.existsSync(chunkDir)) {
res.status(401).json({
ok: false,
msg: '合并失败,请重新上传'
})
return
}
//合并操作
const chunkPaths = await fse.readdir(chunkDir)
// console.log(chunkPaths,chunkPaths.length)
//进行排序操作
chunkPaths.sort((a, b) => {
return a.split('-')[1] - b.split('-')[1]
})
console.log(chunkPaths)
const list = chunkPaths.map((chunkName, index) => {
return new Promise(resolve => {
const chunkPath = path.resolve(chunkDir, chunkName)
const readStream = fse.createReadStream(chunkPath)
const writeStream = fse.createWriteStream(filePath, {
start: index * size,
end: (index + 1) * size
})
//读取完成后移出切片
readStream.on('end', async () => {
await fse.unlink(chunkPath)
resolve()
})
readStream.pipe(writeStream)
})
})
await Promise.all(list)
await fse.remove(chunkDir)//删除创建的临时文件夹
res.status(200).json({
ok: true,
msg: '合并成功'
})
})
app.post('/verify', async function (req, res) {
const { fileHash, filName } = req.body
// console.log(fileHash, filName);
const filePath = path.resolve(UPLOAD_DIR, fileHash + extractExt(filName))
//返回服务器已经上传成功的切片
const chunkDir=path.join(UPLOAD_DIR,fileHash)
let chunkPaths=[]
//如果存在对应的临时文件夹才去读取
if(fse.existsSync(chunkDir)){
chunkPaths = await fse.readdir(chunkDir)
console.log(chunkPaths)
}
if (fse.existsSync(filePath)) {
//如果存在就不用上传
res.status(200).json({
ok: true,
shouldUpload: false
})
} else {
//如果不存在重新上传
res.status(200).json({
ok: true,
shouldUpload: true,
existChunks:chunkPaths
})
}
})
app.listen(3000, () => {
console.log('Server is running on port 3000')
})