Feat: process upload callback sent from slave node

This commit is contained in:
HFO4
2022-03-03 19:17:25 +08:00
parent 4925a356e3
commit e0714fdd53
11 changed files with 222 additions and 235 deletions

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
@@ -437,14 +438,12 @@ func RemoteCallback(url string, body serializer.UploadCallback) error {
}
// 解析回调服务端响应
resp = resp.CheckHTTPResponse(200)
if resp.Err != nil {
return serializer.NewError(serializer.CodeCallbackError, "主机服务器返回异常响应", resp.Err)
}
response, err := resp.DecodeResponse()
if err != nil {
return serializer.NewError(serializer.CodeCallbackError, "从机无法解析主机返回的响应", err)
msg := fmt.Sprintf("从机无法解析主机返回的响应 (StatusCode=%d)", resp.Response.StatusCode)
return serializer.NewError(serializer.CodeCallbackError, msg, err)
}
if response.Code != 0 {
return serializer.NewError(response.Code, response.Msg, errors.New(response.Error))
}

View File

@@ -45,7 +45,7 @@ func NewDriver(policy *model.Policy) (*Driver, error) {
}
// List 列取文件
func (handler Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
func (handler *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
var res []response.Object
reqBody := serializer.ListRequest{
@@ -87,7 +87,7 @@ func (handler Driver) List(ctx context.Context, path string, recursive bool) ([]
}
// getAPIUrl 获取接口请求地址
func (handler Driver) getAPIUrl(scope string, routes ...string) string {
func (handler *Driver) getAPIUrl(scope string, routes ...string) string {
serverURL, err := url.Parse(handler.Policy.Server)
if err != nil {
return ""
@@ -113,7 +113,7 @@ func (handler Driver) getAPIUrl(scope string, routes ...string) string {
}
// Get 获取文件内容
func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
func (handler *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
// 尝试获取速度限制
speedLimit := 0
if user, ok := ctx.Value(fsctx.UserCtx).(model.User); ok {
@@ -150,7 +150,7 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
}
// Put 将文件流保存到指定目录
func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
func (handler *Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
defer file.Close()
// 凭证有效期
@@ -206,7 +206,7 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
// Delete 删除一个或多个文件,
// 返回未删除的文件,及遇到的最后一个错误
func (handler Driver) Delete(ctx context.Context, files []string) ([]string, error) {
func (handler *Driver) Delete(ctx context.Context, files []string) ([]string, error) {
// 封装接口请求正文
reqBody := serializer.RemoteDeleteRequest{
Files: files,
@@ -252,7 +252,7 @@ func (handler Driver) Delete(ctx context.Context, files []string) ([]string, err
}
// Thumb 获取文件缩略图
func (handler Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
func (handler *Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
sourcePath := base64.RawURLEncoding.EncodeToString([]byte(path))
thumbURL := handler.getAPIUrl("thumb") + "/" + sourcePath
ttl := model.GetIntSetting("preview_timeout", 60)
@@ -268,7 +268,7 @@ func (handler Driver) Thumb(ctx context.Context, path string) (*response.Content
}
// Source 获取外链URL
func (handler Driver) Source(
func (handler *Driver) Source(
ctx context.Context,
path string,
baseURL url.URL,
@@ -322,9 +322,9 @@ func (handler Driver) Source(
}
// Token 获取上传策略和认证Token
func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) {
func (handler *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) {
siteURL := model.GetSiteURL()
apiBaseURI, _ := url.Parse(path.Join("/api/v3/callback/remote" + uploadSession.Key + uploadSession.CallbackSecret))
apiBaseURI, _ := url.Parse(path.Join("/api/v3/callback/remote", uploadSession.Key, uploadSession.CallbackSecret))
apiURL := siteURL.ResolveReference(apiBaseURI)
// 在从机端创建上传会话
@@ -347,7 +347,7 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria
}, nil
}
func (handler Driver) getUploadCredential(ctx context.Context, policy serializer.UploadPolicy, TTL int64) (serializer.UploadCredential, error) {
func (handler *Driver) getUploadCredential(ctx context.Context, policy serializer.UploadPolicy, TTL int64) (serializer.UploadCredential, error) {
policyEncoded, err := policy.EncodeUploadPolicy()
if err != nil {
return serializer.UploadCredential{}, err
@@ -371,6 +371,6 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy serializer
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
func (handler *Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}

View File

@@ -207,7 +207,7 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) {
}
// 获取回调会话
callbackSessionRaw, ok := c.Get("callbackSession")
callbackSessionRaw, ok := c.Get(UploadSessionCtx)
if !ok {
return nil, errors.New("找不到回调会话")
}

View File

@@ -194,9 +194,7 @@ func SlaveAfterUpload(session *serializer.UploadSession) Hook {
// 发送回调请求
callbackBody := serializer.UploadCallback{
SourceName: file.SourceName,
PicInfo: file.PicInfo,
Size: fileInfo.Size,
PicInfo: file.PicInfo,
}
return cluster.RemoteCallback(session.Callback, callbackBody)
@@ -287,12 +285,13 @@ func HookChunkUploadFailed(ctx context.Context, fs *FileSystem, fileHeader fsctx
return fileInfo.Model.(*model.File).UpdateSize(fileInfo.AppendStart)
}
// HookChunkUploadFinished 分片上传结束后处理文件
func HookChunkUploadFinished(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error {
fileInfo := fileHeader.Info()
fileModel := fileInfo.Model.(*model.File)
return fileModel.PopChunkToFile(fileInfo.LastModified)
// HookPopPlaceholderToFile 将占位文件提升为正式文件
func HookPopPlaceholderToFile(picInfo string) Hook {
return func(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error {
fileInfo := fileHeader.Info()
fileModel := fileInfo.Model.(*model.File)
return fileModel.PopChunkToFile(fileInfo.LastModified, picInfo)
}
}
// HookChunkUploadFinished 分片上传结束后处理文件

View File

@@ -23,6 +23,8 @@ import (
const (
UploadSessionMetaKey = "upload_session"
UploadSessionCtx = "uploadSession"
UserCtx = "user"
UploadSessionCachePrefix = "callback_"
)
@@ -47,11 +49,11 @@ func (fs *FileSystem) Upload(ctx context.Context, file *fsctx.FileStream) (err e
file.SavePath = savePath
}
// 处理客户端未完成上传时,关闭连接
go fs.CancelUpload(ctx, savePath, file)
// 保存文件
if file.Mode&fsctx.Nop != fsctx.Nop {
// 处理客户端未完成上传时,关闭连接
go fs.CancelUpload(ctx, savePath, file)
err = fs.Handler.Put(ctx, file)
if err != nil {
fs.Trigger(ctx, "AfterUploadFailed", file)
@@ -202,7 +204,7 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS
// 创建回调会话
err = cache.Set(
UploadSessionCachePrefix+callbackKey,
uploadSession,
*uploadSession,
callBackSessionTTL,
)
if err != nil {

View File

@@ -51,9 +51,7 @@ type UploadSession struct {
// UploadCallback 上传回调正文
type UploadCallback struct {
SourceName string `json:"source_name"`
PicInfo string `json:"pic_info"`
Size uint64 `json:"size"`
PicInfo string `json:"pic_info"`
}
// GeneralUploadCallbackFailed 存储策略上传回调失败响应