Feat: chunk upload handling for local policy

This commit is contained in:
HFO4
2022-02-27 14:13:39 +08:00
parent c301bd6045
commit 3444b4a75e
17 changed files with 280 additions and 67 deletions

View File

@@ -184,6 +184,8 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
// Put 将文件流保存到指定目录
func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
defer file.Close()
opt := &cossdk.ObjectPutOptions{}
_, err := handler.Client.Object.Put(ctx, file.Info().SavePath, file, opt)
return err

View File

@@ -19,6 +19,10 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
const (
Perm = 0744
)
// Driver 本地策略适配器
type Driver struct {
Policy *model.Policy
@@ -99,26 +103,61 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
// 如果目标目录不存在,创建
basePath := filepath.Dir(dst)
if !util.Exists(basePath) {
err := os.MkdirAll(basePath, 0744)
err := os.MkdirAll(basePath, Perm)
if err != nil {
util.Log().Warning("无法创建目录,%s", err)
return err
}
}
// 创建目标文件
out, err := os.Create(dst)
var (
out *os.File
err error
)
if fileInfo.Mode == fsctx.Append {
// 如果是追加模式,则直接打开文件
out, err = os.OpenFile(dst, os.O_APPEND|os.O_CREATE|os.O_WRONLY, Perm)
} else {
// 创建或覆盖目标文件
out, err = os.Create(dst)
}
if err != nil {
util.Log().Warning("无法创建文件,%s", err)
util.Log().Warning("无法打开或创建文件,%s", err)
return err
}
defer out.Close()
if fileInfo.Mode == fsctx.Append {
stat, err := out.Stat()
if err != nil {
util.Log().Warning("无法读取文件信息,%s", err)
return err
}
if uint64(stat.Size()) != fileInfo.AppendStart {
return errors.New("未上传完成的文件分片与预期大小不一致")
}
}
// 写入文件内容
_, err = io.Copy(out, file)
return err
}
func (handler Driver) Truncate(ctx context.Context, src string, size uint64) error {
util.Log().Warning("截断文件 [%s] 至 [%d]", src, size)
out, err := os.OpenFile(src, os.O_WRONLY, Perm)
if err != nil {
util.Log().Warning("无法打开或创建文件,%s", err)
return err
}
defer out.Close()
return out.Truncate(int64(size))
}
// Delete 删除一个或多个文件,
// 返回未删除的文件,及遇到的最后一个错误
func (handler Driver) Delete(ctx context.Context, files []string) ([]string, error) {

View File

@@ -122,6 +122,7 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
// Put 将文件流保存到指定目录
func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
defer file.Close()
return handler.Client.Upload(ctx, file)
}

View File

@@ -198,6 +198,7 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
// Put 将文件流保存到指定目录
func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
defer file.Close()
// 初始化客户端
if err := handler.InitS3Client(); err != nil {

View File

@@ -50,6 +50,8 @@ func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy)
// Put 将ctx中指定的从机物理文件由从机上传到目标存储策略
func (d *Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
defer file.Close()
src, ok := ctx.Value(fsctx.SlaveSrcPath).(string)
if !ok {
return ErrSlaveSrcPathNotExist

View File

@@ -213,11 +213,7 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) {
callbackSession := callbackSessionRaw.(*serializer.UploadSession)
// 重新指向上传策略
policy, err := model.GetPolicyByID(callbackSession.PolicyID)
if err != nil {
return nil, err
}
fs.Policy = &policy
fs.Policy = &callbackSession.Policy
fs.User.Policy = policy
err = fs.DispatchHandler()

View File

@@ -15,14 +15,6 @@ const (
Nop
)
// FileHeader 上传来的文件数据处理器
type FileHeader interface {
io.Reader
io.Closer
Info() *UploadTaskInfo
SetSize(uint64)
}
type UploadTaskInfo struct {
Size uint64
MIMEType string
@@ -33,6 +25,17 @@ type UploadTaskInfo struct {
LastModified *time.Time
SavePath string
UploadSessionID *string
AppendStart uint64
Model interface{}
}
// FileHeader 上传来的文件数据处理器
type FileHeader interface {
io.Reader
io.Closer
Info() *UploadTaskInfo
SetSize(uint64)
SetModel(fileModel interface{})
}
// FileStream 用户传来的文件
@@ -47,6 +50,8 @@ type FileStream struct {
MIMEType string
SavePath string
UploadSessionID *string
AppendStart uint64
Model interface{}
}
func (file *FileStream) Read(p []byte) (n int, err error) {
@@ -68,9 +73,15 @@ func (file *FileStream) Info() *UploadTaskInfo {
LastModified: file.LastModified,
SavePath: file.SavePath,
UploadSessionID: file.UploadSessionID,
AppendStart: file.AppendStart,
Model: file.Model,
}
}
func (file *FileStream) SetSize(size uint64) {
file.Size = size
}
func (file *FileStream) SetModel(fileModel interface{}) {
file.Model = fileModel
}

View File

@@ -0,0 +1 @@
package taskinfo

View File

@@ -8,7 +8,9 @@ import (
"sync"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/local"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
@@ -223,25 +225,13 @@ func GenericAfterUpdate(ctx context.Context, fs *FileSystem, newFile fsctx.FileH
return ErrObjectNotExist
}
fs.SetTargetFile(&[]model.File{originFile})
newFile.SetModel(&originFile)
err := originFile.UpdateSize(newFile.Info().Size)
if err != nil {
return err
}
// 尝试清空原有缩略图并重新生成
if originFile.GetPolicy().IsThumbGenerateNeeded() {
fs.recycleLock.Lock()
go func() {
defer fs.recycleLock.Unlock()
if originFile.PicInfo != "" {
_, _ = fs.Handler.Delete(ctx, []string{originFile.SourceName + conf.ThumbConfig.FileSuffix})
fs.GenerateThumbnail(ctx, &originFile)
}
}()
}
return nil
}
@@ -302,17 +292,23 @@ func GenericAfterUpload(ctx context.Context, fs *FileSystem, fileHeader fsctx.Fi
if err != nil {
return ErrInsertFileRecord
}
fs.SetTargetFile(&[]model.File{*file})
fileHeader.SetModel(file)
return nil
}
// HookGenerateThumb 生成缩略图
func HookGenerateThumb(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error {
// 异步尝试生成缩略图
fileMode := fileHeader.Info().Model.(*model.File)
if fs.Policy.IsThumbGenerateNeeded() {
fs.recycleLock.Lock()
go func() {
defer fs.recycleLock.Unlock()
fs.GenerateThumbnail(ctx, file)
_, _ = fs.Handler.Delete(ctx, []string{fileMode.SourceName + conf.ThumbConfig.FileSuffix})
fs.GenerateThumbnail(ctx, fileMode)
}()
}
return nil
}
@@ -321,3 +317,40 @@ func HookClearFileHeaderSize(ctx context.Context, fs *FileSystem, fileHeader fsc
fileHeader.SetSize(0)
return nil
}
// HookTruncateFileTo 将物理文件截断至 size
func HookTruncateFileTo(size uint64) Hook {
return func(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error {
if fs.Policy.Type == "local" {
if driver, ok := fs.Handler.(local.Driver); ok {
return driver.Truncate(ctx, fileHeader.Info().SavePath, size)
}
}
return nil
}
}
// HookChunkUploadFinished 单个分片上传结束后
func HookChunkUploaded(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error {
fileInfo := fileHeader.Info()
// 更新文件大小
return fileInfo.Model.(*model.File).UpdateSize(fileInfo.Model.(*model.File).GetSize() + fileInfo.Size)
}
// HookChunkUploadFinished 分片上传结束后处理文件
func HookChunkUploadFinished(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error {
fileInfo := fileHeader.Info()
fileModel := fileInfo.Model.(*model.File)
return fileModel.PopChunkToFile(fileInfo.LastModified)
}
// HookChunkUploadFinished 分片上传结束后处理文件
func HookDeleteUploadSession(id string) Hook {
return func(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error {
cache.Deletes([]string{id}, UploadSessionCachePrefix)
return nil
}
}

View File

@@ -22,7 +22,8 @@ import (
*/
const (
UploadSessionMetaKey = "upload_session"
UploadSessionMetaKey = "upload_session"
UploadSessionCachePrefix = "callback_"
)
// Upload 上传文件
@@ -36,13 +37,15 @@ func (fs *FileSystem) Upload(ctx context.Context, file *fsctx.FileStream) (err e
// 生成文件名和路径,
var savePath string
// 如果是更新操作就从上下文中获取
if originFile, ok := ctx.Value(fsctx.FileModelCtx).(model.File); ok {
savePath = originFile.SourceName
} else {
savePath = fs.GenerateSavePath(ctx, file)
if file.SavePath == "" {
// 如果是更新操作就从上下文中获取
if originFile, ok := ctx.Value(fsctx.FileModelCtx).(model.File); ok {
savePath = originFile.SourceName
} else {
savePath = fs.GenerateSavePath(ctx, file)
}
file.SavePath = savePath
}
file.SavePath = savePath
// 处理客户端未完成上传时,关闭连接
go fs.CancelUpload(ctx, savePath, file)
@@ -70,14 +73,15 @@ func (fs *FileSystem) Upload(ctx context.Context, file *fsctx.FileStream) (err e
return err
}
fileInfo := file.Info()
util.Log().Info(
"新文件PUT:%s , 大小:%d, 上传者:%s",
fileInfo.FileName,
fileInfo.Size,
fs.User.Nick,
)
if file.Mode == fsctx.Create {
fileInfo := file.Info()
util.Log().Info(
"新文件PUT:%s , 大小:%d, 上传者:%s",
fileInfo.FileName,
fileInfo.Size,
fs.User.Nick,
)
}
return nil
}
@@ -159,6 +163,7 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS
callBackSessionTTL := model.GetIntSetting("upload_session_timeout", 86400)
callbackKey := uuid.Must(uuid.NewV4()).String()
fileSize := file.Size
// 创建占位的文件,同时校验文件信息
file.Mode = fsctx.Nop
@@ -177,12 +182,11 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS
uploadSession := &serializer.UploadSession{
Key: callbackKey,
UID: fs.User.ID,
PolicyID: fs.Policy.ID,
Policy: *fs.Policy,
VirtualPath: file.VirtualPath,
Name: file.Name,
Size: file.Size,
Size: fileSize,
SavePath: file.SavePath,
ChunkSize: fs.Policy.OptionsSerialized.ChunkSize,
LastModified: file.LastModified,
}
@@ -194,7 +198,7 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS
// 创建回调会话
err = cache.Set(
"callback_"+callbackKey,
UploadSessionCachePrefix+callbackKey,
uploadSession,
callBackSessionTTL,
)
@@ -218,6 +222,7 @@ func (fs *FileSystem) UploadFromStream(ctx context.Context, file *fsctx.FileStre
fs.Use("AfterUploadCanceled", HookDeleteTempFile)
fs.Use("AfterUploadCanceled", HookGiveBackCapacity)
fs.Use("AfterUpload", GenericAfterUpload)
fs.Use("AfterUpload", HookGenerateThumb)
fs.Use("AfterValidateFailed", HookDeleteTempFile)
fs.Use("AfterValidateFailed", HookGiveBackCapacity)
fs.Use("AfterUploadFailed", HookGiveBackCapacity)

View File

@@ -248,9 +248,6 @@ func (instance NopRSCloser) Seek(offset int64, whence int) (int64, error) {
// BlackHole 将客户端发来的数据放入黑洞
func BlackHole(r io.Reader) {
if !model.IsTrueVal(model.GetSettingByName("reset_after_upload_failed")) {
_, err := io.Copy(ioutil.Discard, r)
if err != nil {
util.Log().Debug("黑洞数据出错,%s", err)
}
io.Copy(ioutil.Discard, r)
}
}

View File

@@ -72,6 +72,12 @@ const (
CodeAdminRequired = 40008
// CodeMasterNotFound 主机节点未注册
CodeMasterNotFound = 40009
// CodeUploadSessionExpired 上传会话已过期
CodeUploadSessionExpired = 400011
// CodeInvalidChunkIndex 无效的分片序号
CodeInvalidChunkIndex = 400012
// CodeInvalidContentLength 无效的正文长度
CodeInvalidContentLength = 400013
// CodeDBError 数据库操作失败
CodeDBError = 50001
// CodeEncryptError 加密失败

View File

@@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/gob"
"encoding/json"
model "github.com/cloudreve/Cloudreve/v3/models"
"time"
)
@@ -34,15 +35,14 @@ type UploadCredential struct {
// UploadSession 上传会话
type UploadSession struct {
Key string // 上传会话 GUID
UID uint // 发起者
PolicyID uint
Key string // 上传会话 GUID
UID uint // 发起者
VirtualPath string // 用户文件路径,不含文件名
Name string // 文件名
Size uint64 // 文件大小
SavePath string // 物理存储路径,包含物理文件名
ChunkSize uint64 // 分块大小0 为不分快
LastModified *time.Time // 可选的文件最后修改日期
Policy model.Policy
}
// UploadCallback 上传回调正文

View File

@@ -367,6 +367,7 @@ func (h *Handler) handlePut(w http.ResponseWriter, r *http.Request, fs *filesyst
fs.Use("AfterUploadCanceled", filesystem.HookGiveBackCapacity)
fs.Use("AfterUploadCanceled", filesystem.HookCancelContext)
fs.Use("AfterUpload", filesystem.GenericAfterUpload)
fs.Use("AfterUpload", filesystem.HookGenerateThumb)
fs.Use("AfterValidateFailed", filesystem.HookDeleteTempFile)
fs.Use("AfterValidateFailed", filesystem.HookGiveBackCapacity)
fs.Use("AfterUploadFailed", filesystem.HookGiveBackCapacity)
@@ -381,7 +382,7 @@ func (h *Handler) handlePut(w http.ResponseWriter, r *http.Request, fs *filesyst
return http.StatusMethodNotAllowed, err
}
etag, err := findETag(ctx, fs, h.LockSystem[fs.User.ID], reqPath, &fs.FileTarget[0])
etag, err := findETag(ctx, fs, h.LockSystem[fs.User.ID], reqPath, fileData.Model.(*model.File))
if err != nil {
return http.StatusInternalServerError, err
}