Feat: creating upload session and credential from master server

This commit is contained in:
HFO4
2022-02-28 17:52:59 +08:00
parent 118d738797
commit 7214e59c25
12 changed files with 153 additions and 136 deletions

View File

@@ -374,7 +374,7 @@ func (service *FileIDService) PutContent(ctx context.Context, c *gin.Context) se
MIMEType: c.Request.Header.Get("Content-Type"),
File: c.Request.Body,
Size: fileSize,
Model: fsctx.Overwrite,
Mode: fsctx.Overwrite,
}
// 创建文件系统

View File

@@ -4,8 +4,8 @@ import (
"context"
"fmt"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/hashid"
@@ -72,11 +72,11 @@ type UploadService struct {
Index int `uri:"index" form:"index" binding:"min=0"`
}
// Upload 处理本机文件分片上传
func (service *UploadService) Upload(ctx context.Context, c *gin.Context) serializer.Response {
// LocalUpload 处理本机文件分片上传
func (service *UploadService) LocalUpload(ctx context.Context, c *gin.Context) serializer.Response {
uploadSessionRaw, ok := cache.Get(filesystem.UploadSessionCachePrefix + service.ID)
if !ok {
return serializer.Err(serializer.CodeUploadSessionExpired, "Upload session expired or not exist", nil)
return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session expired or not exist", nil)
}
uploadSession := uploadSessionRaw.(serializer.UploadSession)
@@ -87,13 +87,13 @@ func (service *UploadService) Upload(ctx context.Context, c *gin.Context) serial
}
if uploadSession.UID != fs.User.ID {
return serializer.Err(serializer.CodeUploadSessionExpired, "Upload session expired or not exist", nil)
return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session expired or not exist", nil)
}
// 查找上传会话创建的占位文件
file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID)
if err != nil {
return serializer.Err(serializer.CodeUploadSessionExpired, "Upload session file placeholder not exist", err)
return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session file placeholder not exist", err)
}
// 重设 fs 存储策略
@@ -120,10 +120,34 @@ func (service *UploadService) Upload(ctx context.Context, c *gin.Context) serial
util.Log().Info("尝试上传覆盖分片[%d] Start=%d", service.Index, actualSizeStart)
}
return processChunkUpload(ctx, c, fs, &uploadSession, service.Index, file)
return processChunkUpload(ctx, c, fs, &uploadSession, service.Index, file, fsctx.Append|fsctx.Overwrite)
}
func processChunkUpload(ctx context.Context, c *gin.Context, fs *filesystem.FileSystem, session *serializer.UploadSession, index int, file *model.File) serializer.Response {
// SlaveUpload 处理从机文件分片上传
func (service *UploadService) SlaveUpload(ctx context.Context, c *gin.Context) serializer.Response {
uploadSessionRaw, ok := cache.Get(filesystem.UploadSessionCachePrefix + service.ID)
if !ok {
return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session expired or not exist", nil)
}
uploadSession := uploadSessionRaw.(serializer.UploadSession)
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
}
// 解析需要的参数
service.Index, _ = strconv.Atoi(c.Query("chunk"))
mode := fsctx.Append
if c.GetHeader(auth.CrHeaderPrefix+"Overwrite") == "true" {
mode |= fsctx.Overwrite
}
return processChunkUpload(ctx, c, fs, &uploadSession, service.Index, nil, mode)
}
func processChunkUpload(ctx context.Context, c *gin.Context, fs *filesystem.FileSystem, session *serializer.UploadSession, index int, file *model.File, mode fsctx.WriteMode) serializer.Response {
// 取得并校验文件大小是否符合分片要求
chunkSize := session.Policy.OptionsSerialized.ChunkSize
isLastChunk := session.Policy.OptionsSerialized.ChunkSize == 0 || uint64(index+1)*chunkSize >= session.Size
@@ -148,23 +172,30 @@ func processChunkUpload(ctx context.Context, c *gin.Context, fs *filesystem.File
Name: session.Name,
VirtualPath: session.VirtualPath,
SavePath: session.SavePath,
Mode: fsctx.Append | fsctx.Overwrite,
Mode: mode,
AppendStart: chunkSize * uint64(index),
Model: file,
LastModified: session.LastModified,
}
// 给文件系统分配钩子
fs.Use("BeforeUpload", filesystem.HookValidateCapacity)
fs.Use("AfterUploadCanceled", filesystem.HookTruncateFileTo(fileData.AppendStart))
fs.Use("AfterUpload", filesystem.HookChunkUploaded)
if isLastChunk {
fs.Use("AfterUpload", filesystem.HookChunkUploadFinished)
fs.Use("AfterUpload", filesystem.HookGenerateThumb)
fs.Use("AfterUpload", filesystem.HookDeleteUploadSession(session.Key))
}
fs.Use("AfterValidateFailed", filesystem.HookTruncateFileTo(fileData.AppendStart))
fs.Use("AfterValidateFailed", filesystem.HookChunkUploadFailed)
if file != nil {
fs.Use("BeforeUpload", filesystem.HookValidateCapacity)
fs.Use("AfterUpload", filesystem.HookChunkUploaded)
fs.Use("AfterValidateFailed", filesystem.HookChunkUploadFailed)
if isLastChunk {
fs.Use("AfterUpload", filesystem.HookChunkUploadFinished)
fs.Use("AfterUpload", filesystem.HookGenerateThumb)
fs.Use("AfterUpload", filesystem.HookDeleteUploadSession(session.Key))
}
} else {
if isLastChunk {
fs.Use("AfterUpload", filesystem.SlaveAfterUpload)
}
}
// 执行上传
uploadCtx := context.WithValue(ctx, fsctx.GinCtx, c)
@@ -193,7 +224,7 @@ func (service *UploadSessionService) Delete(ctx context.Context, c *gin.Context)
// 查找需要删除的上传会话的占位文件
file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID)
if err != nil {
return serializer.Err(serializer.CodeUploadSessionExpired, "Upload session file placeholder not exist", err)
return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session file placeholder not exist", err)
}
// 删除文件