mirror of
https://github.com/halejohn/Cloudreve.git
synced 2026-01-26 09:34:57 +08:00
Merge pull request #1232 from cloudreve/archiver-decompress
Archiver decompress
This commit is contained in:
@@ -2,11 +2,9 @@ package filesystem
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
@@ -18,8 +16,7 @@ import (
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/util"
|
||||
"github.com/gin-gonic/gin"
|
||||
"golang.org/x/text/encoding/simplifiedchinese"
|
||||
"golang.org/x/text/transform"
|
||||
"github.com/mholt/archiver/v4"
|
||||
)
|
||||
|
||||
/* ===============
|
||||
@@ -168,7 +165,7 @@ func (fs *FileSystem) doCompress(ctx context.Context, file *model.File, folder *
|
||||
}
|
||||
|
||||
// Decompress 解压缩给定压缩文件到dst目录
|
||||
func (fs *FileSystem) Decompress(ctx context.Context, src, dst string) error {
|
||||
func (fs *FileSystem) Decompress(ctx context.Context, src, dst, encoding string) error {
|
||||
err := fs.ResetFileIfNotExist(ctx, src)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -206,21 +203,41 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst string) error {
|
||||
}
|
||||
defer zipFile.Close()
|
||||
|
||||
_, err = io.Copy(zipFile, fileStream)
|
||||
// 下载前先判断是否是可解压的格式
|
||||
format, readStream, err := archiver.Identify(fs.FileTarget[0].SourceName, fileStream)
|
||||
if err != nil {
|
||||
util.Log().Warning("无法写入临时压缩文件 %s , %s", tempZipFilePath, err)
|
||||
util.Log().Warning("无法识别文件格式 %s , %s", fs.FileTarget[0].SourceName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
zipFile.Close()
|
||||
fileStream.Close()
|
||||
|
||||
// 解压缩文件
|
||||
r, err := zip.OpenReader(tempZipFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
extractor, ok := format.(archiver.Extractor)
|
||||
if !ok {
|
||||
return fmt.Errorf("file not an extractor %s", fs.FileTarget[0].SourceName)
|
||||
}
|
||||
|
||||
// 只有zip格式可以多个文件同时上传
|
||||
var isZip bool
|
||||
switch extractor.(type) {
|
||||
case archiver.Zip:
|
||||
extractor = archiver.Zip{TextEncoding: encoding}
|
||||
isZip = true
|
||||
}
|
||||
|
||||
// 除了zip必须下载到本地,其余的可以边下载边解压
|
||||
reader := readStream
|
||||
if isZip {
|
||||
_, err = io.Copy(zipFile, readStream)
|
||||
if err != nil {
|
||||
util.Log().Warning("无法写入临时压缩文件 %s , %s", tempZipFilePath, err)
|
||||
return err
|
||||
}
|
||||
|
||||
fileStream.Close()
|
||||
|
||||
// 设置文件偏移量
|
||||
zipFile.Seek(0, io.SeekStart)
|
||||
reader = zipFile
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
// 重设存储策略
|
||||
fs.Policy = &fs.User.Policy
|
||||
@@ -236,64 +253,64 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst string) error {
|
||||
worker <- i
|
||||
}
|
||||
|
||||
for _, f := range r.File {
|
||||
fileName := f.Name
|
||||
// 处理非UTF-8编码
|
||||
if f.NonUTF8 {
|
||||
i := bytes.NewReader([]byte(fileName))
|
||||
decoder := transform.NewReader(i, simplifiedchinese.GB18030.NewDecoder())
|
||||
content, _ := ioutil.ReadAll(decoder)
|
||||
fileName = string(content)
|
||||
}
|
||||
// 上传文件函数
|
||||
uploadFunc := func(fileStream io.ReadCloser, size int64, savePath, rawPath string) {
|
||||
defer func() {
|
||||
if isZip {
|
||||
worker <- 1
|
||||
wg.Done()
|
||||
}
|
||||
if err := recover(); err != nil {
|
||||
util.Log().Warning("上传压缩包内文件时出错")
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
|
||||
rawPath := util.FormSlash(fileName)
|
||||
err := fs.UploadFromStream(ctx, &fsctx.FileStream{
|
||||
File: fileStream,
|
||||
Size: uint64(size),
|
||||
Name: path.Base(savePath),
|
||||
VirtualPath: path.Dir(savePath),
|
||||
}, true)
|
||||
fileStream.Close()
|
||||
if err != nil {
|
||||
util.Log().Debug("无法上传压缩包内的文件%s , %s , 跳过", rawPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
// 解压缩文件,回调函数如果出错会停止解压的下一步进行,全部return nil
|
||||
err = extractor.Extract(ctx, reader, nil, func(ctx context.Context, f archiver.File) error {
|
||||
rawPath := util.FormSlash(f.NameInArchive)
|
||||
savePath := path.Join(dst, rawPath)
|
||||
// 路径是否合法
|
||||
if !strings.HasPrefix(savePath, util.FillSlash(path.Clean(dst))) {
|
||||
return fmt.Errorf("%s: illegal file path", f.Name)
|
||||
util.Log().Warning("%s: illegal file path", f.NameInArchive)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 如果是目录
|
||||
if f.FileInfo().IsDir() {
|
||||
if f.FileInfo.IsDir() {
|
||||
fs.CreateDirectory(ctx, savePath)
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
|
||||
// 上传文件
|
||||
fileStream, err := f.Open()
|
||||
if err != nil {
|
||||
util.Log().Warning("无法打开压缩包内文件%s , %s , 跳过", rawPath, err)
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-worker:
|
||||
if !isZip {
|
||||
uploadFunc(fileStream, f.FileInfo.Size(), savePath, rawPath)
|
||||
} else {
|
||||
<-worker
|
||||
wg.Add(1)
|
||||
go func(fileStream io.ReadCloser, size int64) {
|
||||
defer func() {
|
||||
worker <- 1
|
||||
wg.Done()
|
||||
if err := recover(); err != nil {
|
||||
util.Log().Warning("上传压缩包内文件时出错")
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
|
||||
err = fs.UploadFromStream(ctx, &fsctx.FileStream{
|
||||
File: fileStream,
|
||||
Size: uint64(size),
|
||||
Name: path.Base(savePath),
|
||||
VirtualPath: path.Dir(savePath),
|
||||
}, true)
|
||||
fileStream.Close()
|
||||
if err != nil {
|
||||
util.Log().Debug("无法上传压缩包内的文件%s , %s , 跳过", rawPath, err)
|
||||
}
|
||||
}(fileStream, f.FileInfo().Size())
|
||||
go uploadFunc(fileStream, f.FileInfo.Size(), savePath, rawPath)
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
})
|
||||
wg.Wait()
|
||||
return nil
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
testMock "github.com/stretchr/testify/mock"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
@@ -147,12 +149,24 @@ func (m MockRSC) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var basepath string
|
||||
|
||||
func init() {
|
||||
_, currentFile, _, _ := runtime.Caller(0)
|
||||
basepath = filepath.Dir(currentFile)
|
||||
}
|
||||
|
||||
func Path(rel string) string {
|
||||
return filepath.Join(basepath, rel)
|
||||
}
|
||||
|
||||
func TestFileSystem_Decompress(t *testing.T) {
|
||||
asserts := assert.New(t)
|
||||
ctx := context.Background()
|
||||
fs := FileSystem{
|
||||
User: &model.User{Model: gorm.Model{ID: 1}},
|
||||
}
|
||||
os.RemoveAll(util.RelativePath("tests/decompress"))
|
||||
|
||||
// 压缩文件不存在
|
||||
{
|
||||
@@ -162,7 +176,7 @@ func TestFileSystem_Decompress(t *testing.T) {
|
||||
// 查找压缩文件,未找到
|
||||
mock.ExpectQuery("SELECT(.+)files(.+)").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}))
|
||||
err := fs.Decompress(ctx, "/1.zip", "/")
|
||||
err := fs.Decompress(ctx, "/1.zip", "/", "")
|
||||
asserts.NoError(mock.ExpectationsWereMet())
|
||||
asserts.Error(err)
|
||||
}
|
||||
@@ -174,7 +188,7 @@ func TestFileSystem_Decompress(t *testing.T) {
|
||||
testHandler := new(FileHeaderMock)
|
||||
testHandler.On("Get", testMock.Anything, "1.zip").Return(MockRSC{}, errors.New("error"))
|
||||
fs.Handler = testHandler
|
||||
err := fs.Decompress(ctx, "/1.zip", "/")
|
||||
err := fs.Decompress(ctx, "/1.zip", "/", "")
|
||||
asserts.NoError(mock.ExpectationsWereMet())
|
||||
asserts.Error(err)
|
||||
asserts.EqualError(err, "error")
|
||||
@@ -188,7 +202,7 @@ func TestFileSystem_Decompress(t *testing.T) {
|
||||
testHandler := new(FileHeaderMock)
|
||||
testHandler.On("Get", testMock.Anything, "1.zip").Return(MockRSC{}, nil)
|
||||
fs.Handler = testHandler
|
||||
err := fs.Decompress(ctx, "/1.zip", "/")
|
||||
err := fs.Decompress(ctx, "/1.zip", "/", "")
|
||||
asserts.NoError(mock.ExpectationsWereMet())
|
||||
asserts.Error(err)
|
||||
}
|
||||
@@ -201,13 +215,13 @@ func TestFileSystem_Decompress(t *testing.T) {
|
||||
testHandler := new(FileHeaderMock)
|
||||
testHandler.On("Get", testMock.Anything, "1.zip").Return(MockNopRSC("1"), nil)
|
||||
fs.Handler = testHandler
|
||||
err := fs.Decompress(ctx, "/1.zip", "/")
|
||||
err := fs.Decompress(ctx, "/1.zip", "/", "")
|
||||
asserts.NoError(mock.ExpectationsWereMet())
|
||||
asserts.Error(err)
|
||||
asserts.EqualError(err, "read error")
|
||||
asserts.Contains(err.Error(), "read error")
|
||||
}
|
||||
|
||||
// 无效zip文件
|
||||
// 无法重设上传策略
|
||||
{
|
||||
cache.Set("setting_temp_path", "tests", 0)
|
||||
fs.FileTarget = []model.File{{SourceName: "1.zip", Policy: model.Policy{Type: "mock"}}}
|
||||
@@ -215,22 +229,7 @@ func TestFileSystem_Decompress(t *testing.T) {
|
||||
testHandler := new(FileHeaderMock)
|
||||
testHandler.On("Get", testMock.Anything, "1.zip").Return(MockRSC{rs: strings.NewReader("read")}, nil)
|
||||
fs.Handler = testHandler
|
||||
err := fs.Decompress(ctx, "/1.zip", "/")
|
||||
asserts.NoError(mock.ExpectationsWereMet())
|
||||
asserts.Error(err)
|
||||
asserts.EqualError(err, "zip: not a valid zip file")
|
||||
}
|
||||
|
||||
// 无法重设上传策略
|
||||
{
|
||||
zipFile, _ := os.Open(util.RelativePath("filesystem/tests/test.zip"))
|
||||
fs.FileTarget = []model.File{{SourceName: "1.zip", Policy: model.Policy{Type: "mock"}}}
|
||||
fs.FileTarget[0].Policy.ID = 1
|
||||
testHandler := new(FileHeaderMock)
|
||||
testHandler.On("Get", testMock.Anything, "1.zip").Return(zipFile, nil)
|
||||
fs.Handler = testHandler
|
||||
err := fs.Decompress(ctx, "/1.zip", "/")
|
||||
zipFile.Close()
|
||||
err := fs.Decompress(ctx, "/1.zip", "/", "")
|
||||
asserts.NoError(mock.ExpectationsWereMet())
|
||||
asserts.Error(err)
|
||||
asserts.True(util.IsEmpty(util.RelativePath("tests/decompress")))
|
||||
@@ -239,7 +238,7 @@ func TestFileSystem_Decompress(t *testing.T) {
|
||||
// 无法上传,容量不足
|
||||
{
|
||||
cache.Set("setting_max_parallel_transfer", "1", 0)
|
||||
zipFile, _ := os.Open(util.RelativePath("filesystem/tests/test.zip"))
|
||||
zipFile, _ := os.Open(Path("tests/test.zip"))
|
||||
fs.FileTarget = []model.File{{SourceName: "1.zip", Policy: model.Policy{Type: "mock"}}}
|
||||
fs.FileTarget[0].Policy.ID = 1
|
||||
fs.User.Policy.Type = "mock"
|
||||
@@ -247,7 +246,7 @@ func TestFileSystem_Decompress(t *testing.T) {
|
||||
testHandler.On("Get", testMock.Anything, "1.zip").Return(zipFile, nil)
|
||||
fs.Handler = testHandler
|
||||
|
||||
fs.Decompress(ctx, "/1.zip", "/")
|
||||
fs.Decompress(ctx, "/1.zip", "/", "")
|
||||
|
||||
zipFile.Close()
|
||||
|
||||
|
||||
@@ -20,8 +20,9 @@ type DecompressTask struct {
|
||||
|
||||
// DecompressProps 压缩任务属性
|
||||
type DecompressProps struct {
|
||||
Src string `json:"src"`
|
||||
Dst string `json:"dst"`
|
||||
Src string `json:"src"`
|
||||
Dst string `json:"dst"`
|
||||
Encoding string `json:"encoding"`
|
||||
}
|
||||
|
||||
// Props 获取任务属性
|
||||
@@ -82,7 +83,7 @@ func (job *DecompressTask) Do() {
|
||||
|
||||
job.TaskModel.SetProgress(DecompressingProgress)
|
||||
|
||||
err = fs.Decompress(context.Background(), job.TaskProps.Src, job.TaskProps.Dst)
|
||||
err = fs.Decompress(context.Background(), job.TaskProps.Src, job.TaskProps.Dst, job.TaskProps.Encoding)
|
||||
if err != nil {
|
||||
job.SetErrorMsg("解压缩失败", err)
|
||||
return
|
||||
@@ -91,12 +92,13 @@ func (job *DecompressTask) Do() {
|
||||
}
|
||||
|
||||
// NewDecompressTask 新建压缩任务
|
||||
func NewDecompressTask(user *model.User, src, dst string) (Job, error) {
|
||||
func NewDecompressTask(user *model.User, src, dst, encoding string) (Job, error) {
|
||||
newTask := &DecompressTask{
|
||||
User: user,
|
||||
TaskProps: DecompressProps{
|
||||
Src: src,
|
||||
Dst: dst,
|
||||
Src: src,
|
||||
Dst: dst,
|
||||
Encoding: encoding,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -99,7 +99,7 @@ func TestNewDecompressTask(t *testing.T) {
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectCommit()
|
||||
job, err := NewDecompressTask(&model.User{}, "/", "/")
|
||||
job, err := NewDecompressTask(&model.User{}, "/", "/", "utf-8")
|
||||
asserts.NoError(mock.ExpectationsWereMet())
|
||||
asserts.NotNil(job)
|
||||
asserts.NoError(err)
|
||||
@@ -110,7 +110,7 @@ func TestNewDecompressTask(t *testing.T) {
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT(.+)").WillReturnError(errors.New("error"))
|
||||
mock.ExpectRollback()
|
||||
job, err := NewDecompressTask(&model.User{}, "/", "/")
|
||||
job, err := NewDecompressTask(&model.User{}, "/", "/", "utf-8")
|
||||
asserts.NoError(mock.ExpectationsWereMet())
|
||||
asserts.Nil(job)
|
||||
asserts.Error(err)
|
||||
|
||||
Reference in New Issue
Block a user