Feat: decompression task

This commit is contained in:
HFO4
2020-02-03 13:23:33 +08:00
parent e722c33cd5
commit 91e202c7e6
19 changed files with 440 additions and 52 deletions

View File

@@ -10,7 +10,10 @@ import (
"github.com/gin-gonic/gin"
"io"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
)
@@ -148,7 +151,7 @@ func (fs *FileSystem) doCompress(ctx context.Context, file *model.File, folder *
// 创建压缩文件头
header := &zip.FileHeader{
Name: filepath.Join(file.Position, file.Name),
Name: filepath.FromSlash(path.Join(file.Position, file.Name)),
Modified: file.UpdatedAt,
UncompressedSize64: file.Size,
}
@@ -185,3 +188,117 @@ func (fs *FileSystem) doCompress(ctx context.Context, file *model.File, folder *
}
}
}
// Decompress 解压缩给定压缩文件到dst目录
func (fs *FileSystem) Decompress(ctx context.Context, src, dst string) error {
err := fs.resetFileIfNotExist(ctx, src)
if err != nil {
return err
}
tempZipFilePath := ""
defer func() {
// 结束时删除临时压缩文件
if tempZipFilePath != "" {
if err := os.Remove(tempZipFilePath); err != nil {
util.Log().Warning("无法删除临时压缩文件 %s , %s", tempZipFilePath, err)
}
}
}()
// 下载压缩文件到临时目录
fileStream, err := fs.Handler.Get(ctx, fs.FileTarget[0].SourceName)
if err != nil {
return err
}
tempZipFilePath = filepath.Join(
model.GetSettingByName("temp_path"),
"decompress",
fmt.Sprintf("archive_%d.zip", time.Now().UnixNano()),
)
zipFile, err := util.CreatNestedFile(tempZipFilePath)
if err != nil {
util.Log().Warning("无法创建临时压缩文件 %s , %s", tempZipFilePath, err)
tempZipFilePath = ""
return err
}
defer zipFile.Close()
_, err = io.Copy(zipFile, fileStream)
if err != nil {
util.Log().Warning("无法写入临时压缩文件 %s , %s", tempZipFilePath, err)
return err
}
zipFile.Close()
// 解压缩文件
r, err := zip.OpenReader(tempZipFilePath)
if err != nil {
return err
}
defer r.Close()
// 重设存储策略
fs.Policy = &fs.User.Policy
err = fs.DispatchHandler()
if err != nil {
return err
}
var wg sync.WaitGroup
parallel := model.GetIntSetting("max_parallel_transfer", 4)
worker := make(chan int, parallel)
for i := 0; i < parallel; i++ {
worker <- i
}
for _, f := range r.File {
rawPath := util.FormSlash(f.Name)
savePath := path.Join(dst, rawPath)
// 路径是否合法
if !strings.HasPrefix(savePath, path.Clean(dst)+"/") {
return fmt.Errorf("%s: illegal file path", f.Name)
}
// 如果是目录
if f.FileInfo().IsDir() {
fs.CreateDirectory(ctx, savePath)
continue
}
// 上传文件
fileStream, err := f.Open()
if err != nil {
util.Log().Warning("无法打开压缩包内文件%s , %s , 跳过", rawPath, err)
continue
}
select {
case <-worker:
go func(fileStream io.ReadCloser, size int64) {
wg.Add(1)
defer func() {
worker <- 1
wg.Done()
if err := recover(); err != nil {
util.Log().Warning("上传压缩包内文件时出错")
fmt.Println(err)
}
}()
err = fs.UploadFromStream(ctx, fileStream, savePath, uint64(size))
fileStream.Close()
if err != nil {
util.Log().Debug("无法上传压缩包内的文件%s , %s , 跳过", rawPath, err)
}
}(fileStream, f.FileInfo().Size())
}
}
wg.Wait()
return nil
}

View File

@@ -180,7 +180,7 @@ func (client *Client) UploadChunk(ctx context.Context, uploadURL string, chunk *
func (client *Client) Upload(ctx context.Context, dst string, size int, file io.Reader) error {
// 小文件,使用简单上传接口上传
if size <= int(SmallFileSize) {
_, err := client.SimpleUpload(ctx, dst, file)
_, err := client.SimpleUpload(ctx, dst, file, int64(size))
return err
}
@@ -248,11 +248,11 @@ func (client *Client) DeleteUploadSession(ctx context.Context, uploadURL string)
}
// SimpleUpload 上传小文件到dst
func (client *Client) SimpleUpload(ctx context.Context, dst string, body io.Reader) (*UploadResult, error) {
func (client *Client) SimpleUpload(ctx context.Context, dst string, body io.Reader, size int64) (*UploadResult, error) {
dst = strings.TrimPrefix(dst, "/")
requestURL := client.getRequestURL("me/drive/root:/" + dst + ":/content")
res, err := client.request(ctx, "PUT", requestURL, body)
res, err := client.request(ctx, "PUT", requestURL, body, request.WithContentLength(int64(size)))
if err != nil {
return nil, err
}
@@ -381,7 +381,7 @@ func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size ui
case <-time.After(time.Duration(ttl) * time.Second):
// 上传会话到期,仍未完成上传,创建占位符
client.DeleteUploadSession(context.Background(), uploadURL)
_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""))
_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""), 0)
if err != nil {
util.Log().Debug("无法创建占位文件,%s", err)
}
@@ -428,7 +428,7 @@ func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size ui
// 取消上传会话实测OneDrive取消上传会话后客户端还是可以上传
// 所以上传一个空文件占位,阻止客户端上传
client.DeleteUploadSession(context.Background(), uploadURL)
_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""))
_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""), 0)
if err != nil {
util.Log().Debug("无法创建占位文件,%s", err)
}

View File

@@ -12,6 +12,7 @@ var (
ErrInsufficientCapacity = errors.New("容量空间不足")
ErrIllegalObjectName = errors.New("目标名称非法")
ErrClientCanceled = errors.New("客户端取消操作")
ErrRootProtected = errors.New("无法对根目录进行操作")
ErrInsertFileRecord = serializer.NewError(serializer.CodeDBError, "无法插入文件记录", nil)
ErrFileExisted = serializer.NewError(serializer.CodeObjectExist, "同名文件或目录已存在", nil)
ErrFolderExisted = serializer.NewError(serializer.CodeObjectExist, "同名目录已存在", nil)

View File

@@ -78,6 +78,8 @@ type FileSystem struct {
DirTarget []model.Folder
// 相对根目录
Root *model.Folder
// 互斥锁
Lock sync.Mutex
/*
钩子函数
@@ -110,6 +112,7 @@ func (fs *FileSystem) reset() {
fs.Hooks = nil
fs.Handler = nil
fs.Root = nil
fs.Lock = sync.Mutex{}
}
// NewFileSystem 初始化一个文件系统

View File

@@ -31,4 +31,6 @@ const (
ShareKeyCtx
// LimitParentCtx 限制父目录
LimitParentCtx
// IgnoreConflictCtx 忽略重名冲突
IgnoreConflictCtx
)

View File

@@ -28,6 +28,15 @@ func (fs *FileSystem) Use(name string, hook Hook) {
fs.Hooks[name] = []Hook{hook}
}
// CleanHooks 清空钩子,name为空表示全部清空
func (fs *FileSystem) CleanHooks(name string) {
if name == "" {
fs.Hooks = nil
} else {
delete(fs.Hooks, name)
}
}
// Trigger 触发钩子,遇到第一个错误时
// 返回错误,后续钩子不会继续执行
func (fs *FileSystem) Trigger(ctx context.Context, name string) error {
@@ -247,10 +256,14 @@ func GenericAfterUpload(ctx context.Context, fs *FileSystem) error {
// 文件存放的虚拟路径
virtualPath := ctx.Value(fsctx.FileHeaderCtx).(FileHeader).GetVirtualPath()
// 检查路径是否存在
// 检查路径是否存在,不存在就创建
isExist, folder := fs.IsPathExist(virtualPath)
if !isExist {
return ErrPathNotExist
newFolder, err := fs.CreateDirectory(ctx, virtualPath)
if err != nil {
return err
}
folder = newFolder
}
// 检查文件是否存在

View File

@@ -328,8 +328,12 @@ func (fs *FileSystem) List(ctx context.Context, dirPath string, pathProcessor fu
return objects, nil
}
// CreateDirectory 根据给定的完整创建目录,不会递归创建
func (fs *FileSystem) CreateDirectory(ctx context.Context, fullPath string) error {
// CreateDirectory 根据给定的完整创建目录,支持递归创建
func (fs *FileSystem) CreateDirectory(ctx context.Context, fullPath string) (*model.Folder, error) {
if fullPath == "/" {
return nil, ErrRootProtected
}
// 获取要创建目录的父路径和目录名
fullPath = path.Clean(fullPath)
base := path.Dir(fullPath)
@@ -340,18 +344,26 @@ func (fs *FileSystem) CreateDirectory(ctx context.Context, fullPath string) erro
// 检查目录名是否合法
if !fs.ValidateLegalName(ctx, dir) {
return ErrIllegalObjectName
return nil, ErrIllegalObjectName
}
// 父目录是否存在
isExist, parent := fs.IsPathExist(base)
if !isExist {
return ErrPathNotExist
// 递归创建父目录
if _, ok := ctx.Value(fsctx.IgnoreConflictCtx).(bool); !ok {
ctx = context.WithValue(ctx, fsctx.IgnoreConflictCtx, true)
}
newParent, err := fs.CreateDirectory(ctx, base)
if err != nil {
return nil, err
}
parent = newParent
}
// 是否有同名文件
if ok, _ := fs.IsChildFileExist(parent, dir); ok {
return ErrFileExisted
return nil, ErrFileExisted
}
// 创建目录
@@ -363,9 +375,12 @@ func (fs *FileSystem) CreateDirectory(ctx context.Context, fullPath string) erro
_, err := newFolder.Create()
if err != nil {
return ErrFolderExisted
if _, ok := ctx.Value(fsctx.IgnoreConflictCtx).(bool); !ok {
return nil, ErrFolderExisted
}
}
return nil
return &newFolder, nil
}
// SaveTo 将别人分享的文件转存到目标路径下

View File

@@ -146,12 +146,12 @@ func TestFileSystem_CreateDirectory(t *testing.T) {
ctx := context.Background()
// 目录名非法
err := fs.CreateDirectory(ctx, "/ad/a+?")
_, err := fs.CreateDirectory(ctx, "/ad/a+?")
asserts.Equal(ErrIllegalObjectName, err)
// 父目录不存在
mock.ExpectQuery("SELECT(.+)folders").WillReturnRows(sqlmock.NewRows([]string{"id", "name"}))
err = fs.CreateDirectory(ctx, "/ad/ab")
_, err = fs.CreateDirectory(ctx, "/ad/ab")
asserts.Equal(ErrPathNotExist, err)
asserts.NoError(mock.ExpectationsWereMet())
@@ -166,7 +166,7 @@ func TestFileSystem_CreateDirectory(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"id", "owner_id"}).AddRow(2, 1))
mock.ExpectQuery("SELECT(.+)files").WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).AddRow(1, "ab"))
err = fs.CreateDirectory(ctx, "/ad/ab")
_, err = fs.CreateDirectory(ctx, "/ad/ab")
asserts.Equal(ErrFileExisted, err)
asserts.NoError(mock.ExpectationsWereMet())
@@ -183,7 +183,7 @@ func TestFileSystem_CreateDirectory(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)").WillReturnError(errors.New("s"))
mock.ExpectRollback()
err = fs.CreateDirectory(ctx, "/ad/ab")
_, err = fs.CreateDirectory(ctx, "/ad/ab")
asserts.Error(err)
asserts.NoError(mock.ExpectationsWereMet())
@@ -201,7 +201,7 @@ func TestFileSystem_CreateDirectory(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
err = fs.CreateDirectory(ctx, "/ad/ab")
_, err = fs.CreateDirectory(ctx, "/ad/ab")
asserts.NoError(err)
asserts.NoError(mock.ExpectationsWereMet())
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/HFO4/cloudreve/pkg/serializer"
"github.com/HFO4/cloudreve/pkg/util"
"github.com/gin-gonic/gin"
"io"
"os"
"path"
)
@@ -186,8 +187,45 @@ func (fs *FileSystem) GetUploadToken(ctx context.Context, path string, size uint
return &credential, nil
}
// UploadFromStream 从文件流上传文件
func (fs *FileSystem) UploadFromStream(ctx context.Context, src io.ReadCloser, dst string, size uint64) error {
// 构建文件头
fileName := path.Base(dst)
filePath := path.Dir(dst)
fileData := local.FileStream{
File: src,
Size: size,
Name: fileName,
VirtualPath: filePath,
}
// 给文件系统分配钩子
fs.Lock.Lock()
if fs.Hooks == nil {
fs.Use("BeforeUpload", HookValidateFile)
fs.Use("BeforeUpload", HookValidateCapacity)
fs.Use("AfterUploadCanceled", HookDeleteTempFile)
fs.Use("AfterUploadCanceled", HookGiveBackCapacity)
fs.Use("AfterUpload", GenericAfterUpload)
fs.Use("AfterValidateFailed", HookDeleteTempFile)
fs.Use("AfterValidateFailed", HookGiveBackCapacity)
fs.Use("AfterUploadFailed", HookGiveBackCapacity)
}
fs.Lock.Unlock()
// 开始上传
return fs.Upload(ctx, fileData)
}
// UploadFromPath 将本机已有文件上传到用户的文件系统
func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string) error {
// 重设存储策略
fs.Policy = &fs.User.Policy
err := fs.DispatchHandler()
if err != nil {
return err
}
file, err := os.Open(src)
if err != nil {
return err
@@ -201,26 +239,6 @@ func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string) error
}
size := fi.Size()
// 构建文件头
fileName := path.Base(dst)
filePath := path.Dir(dst)
fileData := local.FileStream{
File: file,
Size: uint64(size),
Name: fileName,
VirtualPath: filePath,
}
// 给文件系统分配钩子
fs.Use("BeforeUpload", HookValidateFile)
fs.Use("BeforeUpload", HookValidateCapacity)
fs.Use("AfterUploadCanceled", HookDeleteTempFile)
fs.Use("AfterUploadCanceled", HookGiveBackCapacity)
fs.Use("AfterUpload", GenericAfterUpload)
fs.Use("AfterValidateFailed", HookDeleteTempFile)
fs.Use("AfterValidateFailed", HookGiveBackCapacity)
fs.Use("AfterUploadFailed", HookGiveBackCapacity)
// 开始上传
return fs.Upload(ctx, fileData)
return fs.UploadFromStream(ctx, file, dst, uint64(size))
}