断点续传
在文件上传期间因为一些事情中断(比如网络中断,服务器出错,客户端奔溃),但是在下次上传同一个文件可以从上一次上传的位置继续上传,以节省上传时间。
实现思路
- 将文件上传分为多个切片
- 上传切片
- 合并切片
- 文件切片验证
- 状态的切换
1.文件如何在前端分切片
我们需要为文件生成唯一fileHash
,目的是为了每次上传判断此文件是否之前存在过断点,如果 hash 相同同时存在断点,那么就需要断点续传,如果上传成功过就直接重新上传即可。
使用Blob
对象中的slice
函数可以进行对文件进行切片处理。可以将文件切为自定义的大小和数量.
创建一个createChunl
函数
const createFileChunk = async (file, size = SIZE) => {
const fileChunkList = []
let spark = new SparkMd5.ArrayBuffer()
let readerComplete = false
const reader = new FileReader()
for (let cur = 0; cur < file.size; cur += size) {
let data = { file: file.slice(cur, cur + size) }
fileChunkList.push(data)
reader.readAsArrayBuffer(data.file)
await new Promise((resolve) => {
reader.onload = (e) => {
spark.append(e.target.result)
resolve()
}
})
}
fileHash.value = spark.end()
return fileChunkList
}
spark-md5
是一个高效的 md5 加密算法,详情请看文档spark-md5,通过spark-md5
将文件内容加密为一个 hash 值,用来作为文件上传的唯一标识。
创建一个upload
函数
const handleUpload = async () => {
if (!uploadFile.value) return
loading.value = true
const fileChunkList = await createFileChunk(uploadFile.value)
let vertifyRes = await request({
url: '/vertify',
method: 'post',
data: {
fileHash: fileHash.value,
filename: fileHash.value + '.' + getSuffix(uploadFile.value.name)
}
})
if (!vertifyRes.data.shouldUpload) {
alert(vertifyRes.msg)
uploadPercentage.value = 100
loading.value = false
return
}
data.value = fileChunkList.map(({ file }, index) => ({
chunk: file,
hashPrefix: fileHash.value,
suffix: getSuffix(uploadFile.value.name),
hash: fileHash.value + '-' + index,
index: index,
percentage: 0
}))
await uploadChunk(vertifyRes.data.uploadList)
loading.value = false
}
上传文件切片,首先需要验证 hash 值是否在服务端已经存在如果存在实际上相同的文件已经存在了,当效验未通过时说明不需要再上传了,只有当效验通过是才可以上传文件,验证接口的响应数据中会带有切片的 hash 值,为了是过滤掉已上传的切片提高上传效率。
剩下的前端只需要过滤 hash 的切片,将剩下的切片上传即可
// 上传切片
const uploadChunk = async (uploadList = []) => {
const requestList = data.value
.filter(({ hash }) => !uploadList.includes(hash))
.map(({ chunk, hash, index, hashPrefix }) => {
const formData = new FormData()
formData.append('chunk', chunk)
formData.append('hash', hash)
formData.append('filename', uploadFile.value.name)
formData.append('index', index)
formData.append('hashPrefix', hashPrefix)
return { formData, index }
})
.map(async ({ formData, index }) => {
return request({
url: '/upload_chunk',
data: formData,
onUploadProgress: onPregress.bind(null, data.value[index]),
cancelToken: new axios.CancelToken((c) => {
aborts.value.push(c)
}),
headers: {
'Content-Type': 'multipart/form-data'
},
method: 'post'
})
})
showStopAndResume.value = true
await Promise.all(requestList)
// 合并切片
await mergeRequest()
aborts.value = []
}
这里的mergeRequest
是当所有的切片上传成功后,要告诉后端进行切片合并。
const mergeRequest = async () => {
let res = await request({
url: '/merge_chunk',
headers: {
'Content-Type': 'application/json'
},
method: 'post',
data: {
originName: uploadFile.value.name,
filename: fileHash.value + '.' + getSuffix(uploadFile.value.name),
size: SIZE
}
})
}
这样前端一些核心的代码基本完成了。
2.文件如何在后端实现切片上传
后端暂且使用express
实现
根据前端的要求有以下几个接口需要实现
- /upload_chunk
- /merge_chunk
- /vertify
实现思路
1./upload_chunk
利用fs
相关 api 将上传的缓存 chunk 资源移动到对应目录中
const multipart = new multiparty.Form({
maxFieldsSize: 200 * 1024 * 1024
})
multipart.parse(req, async (err, fields, files) => {
if (err) {
res.send({
code: 500,
msg: '服务器错误'
})
return
}
const [chunk] = files.chunk
const [hash] = fields.hash
const [hashPrefix] = fields.hashPrefix
const chunkDir = path.resolve(UPLOAD_DIR, hashPrefix)
if (!fse.existsSync(chunkDir)) {
await fse.mkdirs(chunkDir)
}
await fse.move(chunk.path, `${chunkDir}/${hash}`)
res.send({
code: 200,
msg: '上传成功'
})
})
2./merge_chunk
将上传的chunk
按照hash-index
序号进行顺序合并,用到比较核心的 api 就是fs.createReadStream()
// 合并切片
const mergeFileChunk = async (filePath, filename, size) => {
const chunkDir = path.resolve(
UPLOAD_DIR,
filename.slice(0, filename.lastIndexOf('.'))
)
const chunkPaths = await fse.readdir(chunkDir)
// 根据切片下标进行排序
// 否则直接读取目录的获取的顺序可能会错乱
chunkPaths.sort((a, b) => a.split('-')[1] - b.split('-')[1])
let pipeP = chunkPaths.map((chunkPath, index) =>
pipeStream(
path.resolve(chunkDir, chunkPath),
fse.createWriteStream(filePath, {
start: index * size,
end: (index + 1) * size
})
)
)
await Promise.all(pipeP)
fse.rmdirSync(chunkDir) // 合并后删除保存的切片目录
}
const pipeStream = (path, writeStream) => {
return new Promise((resolve) => {
const readStream = fse.createReadStream(path)
readStream.on('end', () => {
fse.unlinkSync(path)
resolve()
})
readStream.pipe(writeStream)
})
}
app.post('/merge_chunk', async (req, res) => {
// size是每一个chunk的size
const { filename, size } = req.body
const filePath = path.resolve(UPLOAD_DIR, `${filename}`)
await mergeFileChunk(filePath, filename, size)
res.send({
code: 200,
msg: 'file merged success'
})
})
创建chunk
切片路径的读入流,然后将读入流写入新的地址,当所有切片全部写入完成,完整的文件就合并成功了。
3./vertify
这个 api 比较简单,用来查看当前文件是否存在服务器资源目录中,存在返回shouldUpload
为 false,不存在就需要获取此文件上传的切片数组用于过滤前端不需要上传的切片
const createUploadedList = async (fileHash) =>
fse.existsSync(path.resolve(UPLOAD_DIR, fileHash))
? await fse.readdir(path.resolve(UPLOAD_DIR, fileHash))
: []
app.post('/vertify', async (req, res) => {
const { filename, fileHash } = req.body
const filePath = path.resolve(UPLOAD_DIR, filename)
if (fse.existsSync(filePath)) {
res.send({
code: 200,
data: {
shouldUpload: false
},
msg: '文件已上传成功'
})
return
}
res.send({
code: 200,
data: {
shouldUpload: true,
uploadList: await createUploadedList(fileHash)
},
msg: '请求成功'
})
})
到此,大文件上传基本工作就已经完成了。其中最核心的思想就是切片思想,将大的文件进行唯一标识+切片上传,服务器进行先缓存切片在进行合并操作。保证了断点可续传,大文件稳定上传。