Feat: aria2 download and transfer in slave node (#1040)

* Feat: retrieve nodes from data table

* Feat: master node ping slave node in REST API

* Feat: master send scheduled ping request

* Feat: inactive nodes recover loop

* Modify: remove database operations from aria2 RPC caller implementation

* Feat: init aria2 client in master node

* Feat: Round Robin load balancer

* Feat: create and monitor aria2 task in master node

* Feat: salve receive and handle heartbeat

* Fix: Node ID will be 0 in download record generated in older version

* Feat: sign request headers with all `X-` prefix

* Feat: API call to slave node will carry meta data in headers

* Feat: call slave aria2 rpc method from master

* Feat: get slave aria2 task status
Feat: encode slave response data using gob

* Feat: aria2 callback to master node / cancel or select task to slave node

* Fix: use dummy aria2 client when caller initialize failed in master node

* Feat: slave aria2 status event callback / salve RPC auth

* Feat: prototype for slave driven filesystem

* Feat: retry for init aria2 client in master node

* Feat: init request client with global options

* Feat: slave receive async task from master

* Fix: competition write in request header

* Refactor: dependency initialize order

* Feat: generic message queue implementation

* Feat: message queue implementation

* Feat: master waiting slave transfer result

* Feat: slave transfer file in stateless policy

* Feat: slave transfer file in slave policy

* Feat: slave transfer file in local policy

* Feat: slave transfer file in OneDrive policy

* Fix: failed to initialize update checker http client

* Feat: list slave nodes for dashboard

* Feat: test aria2 rpc connection in slave

* Feat: add and save node

* Feat: add and delete node in node pool

* Fix: temp file cannot be removed when aria2 task fails

* Fix: delete node in admin panel

* Feat: edit node and get node info

* Modify: delete unused settings
This commit is contained in:
AaronLiu
2021-10-31 09:41:56 +08:00
committed by GitHub
parent a3b4a22dbc
commit 056de22edb
74 changed files with 3647 additions and 715 deletions

View File

@@ -1,169 +1,65 @@
package aria2
import (
"encoding/json"
"context"
"fmt"
"net/url"
"sync"
"time"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/monitor"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"github.com/cloudreve/Cloudreve/v3/pkg/balancer"
)
// Instance 默认使用的Aria2处理实例
var Instance Aria2 = &DummyAria2{}
var Instance common.Aria2 = &common.DummyAria2{}
// LB 获取 Aria2 节点的负载均衡器
var LB balancer.Balancer
// Lock Instance的读写锁
var Lock sync.RWMutex
// EventNotifier 任务状态更新通知处理
var EventNotifier = &Notifier{}
// Aria2 离线下载处理接口
type Aria2 interface {
// CreateTask 创建新的任务
CreateTask(task *model.Download, options map[string]interface{}) error
// 返回状态信息
Status(task *model.Download) (rpc.StatusInfo, error)
// 取消任务
Cancel(task *model.Download) error
// 选择要下载的文件
Select(task *model.Download, files []int) error
}
const (
// URLTask 从URL添加的任务
URLTask = iota
// TorrentTask 种子任务
TorrentTask
)
const (
// Ready 准备就绪
Ready = iota
// Downloading 下载中
Downloading
// Paused 暂停中
Paused
// Error 出错
Error
// Complete 完成
Complete
// Canceled 取消/停止
Canceled
// Unknown 未知状态
Unknown
)
var (
// ErrNotEnabled 功能未开启错误
ErrNotEnabled = serializer.NewError(serializer.CodeNoPermissionErr, "离线下载功能未开启", nil)
// ErrUserNotFound 未找到下载任务创建者
ErrUserNotFound = serializer.NewError(serializer.CodeNotFound, "无法找到任务创建者", nil)
)
// DummyAria2 未开启Aria2功能时使用的默认处理器
type DummyAria2 struct {
}
// CreateTask 创建新任务,此处直接返回未开启错误
func (instance *DummyAria2) CreateTask(model *model.Download, options map[string]interface{}) error {
return ErrNotEnabled
}
// Status 返回未开启错误
func (instance *DummyAria2) Status(task *model.Download) (rpc.StatusInfo, error) {
return rpc.StatusInfo{}, ErrNotEnabled
}
// Cancel 返回未开启错误
func (instance *DummyAria2) Cancel(task *model.Download) error {
return ErrNotEnabled
}
// Select 返回未开启错误
func (instance *DummyAria2) Select(task *model.Download, files []int) error {
return ErrNotEnabled
// GetLoadBalancer 返回供Aria2使用的负载均衡
func GetLoadBalancer() balancer.Balancer {
Lock.RLock()
defer Lock.RUnlock()
return LB
}
// Init 初始化
func Init(isReload bool) {
Lock.Lock()
defer Lock.Unlock()
// 关闭上个初始连接
if previousClient, ok := Instance.(*RPCService); ok {
if previousClient.Caller != nil {
util.Log().Debug("关闭上个 aria2 连接")
previousClient.Caller.Close()
}
}
options := model.GetSettingByNames("aria2_rpcurl", "aria2_token", "aria2_options")
timeout := model.GetIntSetting("aria2_call_timeout", 5)
if options["aria2_rpcurl"] == "" {
Instance = &DummyAria2{}
return
}
util.Log().Info("初始化 aria2 RPC 服务[%s]", options["aria2_rpcurl"])
client := &RPCService{}
// 解析RPC服务地址
server, err := url.Parse(options["aria2_rpcurl"])
if err != nil {
util.Log().Warning("无法解析 aria2 RPC 服务地址,%s", err)
Instance = &DummyAria2{}
return
}
server.Path = "/jsonrpc"
// 加载自定义下载配置
var globalOptions map[string]interface{}
err = json.Unmarshal([]byte(options["aria2_options"]), &globalOptions)
if err != nil {
util.Log().Warning("无法解析 aria2 全局配置,%s", err)
Instance = &DummyAria2{}
return
}
if err := client.Init(server.String(), options["aria2_token"], timeout, globalOptions); err != nil {
util.Log().Warning("初始化 aria2 RPC 服务失败,%s", err)
Instance = &DummyAria2{}
return
}
Instance = client
LB = balancer.NewBalancer("RoundRobin")
Lock.Unlock()
if !isReload {
// 从数据库中读取未完成任务,创建监控
unfinished := model.GetDownloadsByStatus(Ready, Paused, Downloading)
unfinished := model.GetDownloadsByStatus(common.Ready, common.Paused, common.Downloading)
for i := 0; i < len(unfinished); i++ {
// 创建任务监控
NewMonitor(&unfinished[i])
monitor.NewMonitor(&unfinished[i])
}
}
}
// getStatus 将给定的状态字符串转换为状态标识数字
func getStatus(status string) int {
switch status {
case "complete":
return Complete
case "active":
return Downloading
case "waiting":
return Ready
case "paused":
return Paused
case "error":
return Error
case "removed":
return Canceled
default:
return Unknown
// TestRPCConnection 发送测试用的 RPC 请求,测试服务连通性
func TestRPCConnection(server, secret string, timeout int) (rpc.VersionInfo, error) {
// 解析RPC服务地址
rpcServer, err := url.Parse(server)
if err != nil {
return rpc.VersionInfo{}, fmt.Errorf("cannot parse RPC server: %w", err)
}
rpcServer.Path = "/jsonrpc"
caller, err := rpc.New(context.Background(), rpcServer.String(), secret, time.Duration(timeout)*time.Second, nil)
if err != nil {
return rpc.VersionInfo{}, fmt.Errorf("cannot initialize rpc connection: %w", err)
}
return caller.GetVersion()
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/DATA-DOG/go-sqlmock"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/monitor"
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/jinzhu/gorm"
"github.com/stretchr/testify/assert"
@@ -37,7 +38,7 @@ func TestDummyAria2(t *testing.T) {
}
func TestInit(t *testing.T) {
MAX_RETRY = 0
monitor.MAX_RETRY = 0
asserts := assert.New(t)
cache.Set("setting_aria2_token", "1", 0)
cache.Set("setting_aria2_call_timeout", "5", 0)
@@ -81,11 +82,11 @@ func TestInit(t *testing.T) {
func TestGetStatus(t *testing.T) {
asserts := assert.New(t)
asserts.Equal(4, getStatus("complete"))
asserts.Equal(1, getStatus("active"))
asserts.Equal(0, getStatus("waiting"))
asserts.Equal(2, getStatus("paused"))
asserts.Equal(3, getStatus("error"))
asserts.Equal(5, getStatus("removed"))
asserts.Equal(6, getStatus("?"))
asserts.Equal(4, GetStatus("complete"))
asserts.Equal(1, GetStatus("active"))
asserts.Equal(0, GetStatus("waiting"))
asserts.Equal(2, GetStatus("paused"))
asserts.Equal(3, GetStatus("error"))
asserts.Equal(5, GetStatus("removed"))
asserts.Equal(6, GetStatus("?"))
}

View File

@@ -9,6 +9,7 @@ import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
@@ -33,7 +34,7 @@ func (client *RPCService) Init(server, secret string, timeout int, options map[s
Options: options,
}
caller, err := rpc.New(context.Background(), server, secret, time.Duration(timeout)*time.Second,
EventNotifier)
mq.GlobalMQ)
client.Caller = caller
return err
}
@@ -85,7 +86,7 @@ func (client *RPCService) Select(task *model.Download, files []int) error {
}
// CreateTask 创建新任务
func (client *RPCService) CreateTask(task *model.Download, groupOptions map[string]interface{}) error {
func (client *RPCService) CreateTask(task *model.Download, groupOptions map[string]interface{}) (string, error) {
// 生成存储路径
path := filepath.Join(
model.GetSettingByName("aria2_temp_path"),
@@ -106,18 +107,8 @@ func (client *RPCService) CreateTask(task *model.Download, groupOptions map[stri
gid, err := client.Caller.AddURI(task.Source, options)
if err != nil || gid == "" {
return err
return "", err
}
// 保存到数据库
task.GID = gid
_, err = task.Create()
if err != nil {
return err
}
// 创建任务监控
NewMonitor(task)
return nil
return gid, nil
}

114
pkg/aria2/common/common.go Normal file
View File

@@ -0,0 +1,114 @@
package common
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
)
// Aria2 离线下载处理接口
type Aria2 interface {
// Init 初始化客户端连接
Init() error
// CreateTask 创建新的任务
CreateTask(task *model.Download, options map[string]interface{}) (string, error)
// 返回状态信息
Status(task *model.Download) (rpc.StatusInfo, error)
// 取消任务
Cancel(task *model.Download) error
// 选择要下载的文件
Select(task *model.Download, files []int) error
// 获取离线下载配置
GetConfig() model.Aria2Option
// 删除临时下载文件
DeleteTempFile(*model.Download) error
}
const (
// URLTask 从URL添加的任务
URLTask = iota
// TorrentTask 种子任务
TorrentTask
)
const (
// Ready 准备就绪
Ready = iota
// Downloading 下载中
Downloading
// Paused 暂停中
Paused
// Error 出错
Error
// Complete 完成
Complete
// Canceled 取消/停止
Canceled
// Unknown 未知状态
Unknown
)
var (
// ErrNotEnabled 功能未开启错误
ErrNotEnabled = serializer.NewError(serializer.CodeNoPermissionErr, "离线下载功能未开启", nil)
// ErrUserNotFound 未找到下载任务创建者
ErrUserNotFound = serializer.NewError(serializer.CodeNotFound, "无法找到任务创建者", nil)
)
// DummyAria2 未开启Aria2功能时使用的默认处理器
type DummyAria2 struct {
}
func (instance *DummyAria2) Init() error {
return nil
}
// CreateTask 创建新任务,此处直接返回未开启错误
func (instance *DummyAria2) CreateTask(model *model.Download, options map[string]interface{}) (string, error) {
return "", ErrNotEnabled
}
// Status 返回未开启错误
func (instance *DummyAria2) Status(task *model.Download) (rpc.StatusInfo, error) {
return rpc.StatusInfo{}, ErrNotEnabled
}
// Cancel 返回未开启错误
func (instance *DummyAria2) Cancel(task *model.Download) error {
return ErrNotEnabled
}
// Select 返回未开启错误
func (instance *DummyAria2) Select(task *model.Download, files []int) error {
return ErrNotEnabled
}
// GetConfig 返回空的
func (instance *DummyAria2) GetConfig() model.Aria2Option {
return model.Aria2Option{}
}
// GetConfig 返回空的
func (instance *DummyAria2) DeleteTempFile(src *model.Download) error {
return ErrNotEnabled
}
// GetStatus 将给定的状态字符串转换为状态标识数字
func GetStatus(status string) int {
switch status {
case "complete":
return Complete
case "active":
return Downloading
case "waiting":
return Ready
case "paused":
return Paused
case "error":
return Error
case "removed":
return Canceled
default:
return Unknown
}
}

View File

@@ -1,19 +1,21 @@
package aria2
package monitor
import (
"context"
"encoding/json"
"errors"
"os"
"path/filepath"
"strconv"
"time"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/local"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
@@ -23,32 +25,34 @@ type Monitor struct {
Task *model.Download
Interval time.Duration
notifier chan StatusEvent
notifier <-chan mq.Message
node cluster.Node
retried int
}
// StatusEvent 状态改变事件
type StatusEvent struct {
GID string
Status int
}
var MAX_RETRY = 10
// NewMonitor 新建上传状态监控
// NewMonitor 新建离线下载状态监控
func NewMonitor(task *model.Download) {
monitor := &Monitor{
Task: task,
Interval: time.Duration(model.GetIntSetting("aria2_interval", 10)) * time.Second,
notifier: make(chan StatusEvent),
notifier: make(chan mq.Message),
node: cluster.Default.GetNodeByID(task.GetNodeID()),
}
if monitor.node != nil {
monitor.Interval = time.Duration(monitor.node.GetAria2Instance().GetConfig().Interval) * time.Second
go monitor.Loop()
monitor.notifier = mq.GlobalMQ.Subscribe(monitor.Task.GID, 0)
} else {
monitor.setErrorStatus(errors.New("节点不可用"))
}
go monitor.Loop()
EventNotifier.Subscribe(monitor.notifier, monitor.Task.GID)
}
// Loop 开启监控循环
func (monitor *Monitor) Loop() {
defer EventNotifier.Unsubscribe(monitor.Task.GID)
defer mq.GlobalMQ.Unsubscribe(monitor.Task.GID, monitor.notifier)
// 首次循环立即更新
interval := time.Duration(0)
@@ -70,9 +74,7 @@ func (monitor *Monitor) Loop() {
// Update 更新状态,返回值表示是否退出监控
func (monitor *Monitor) Update() bool {
Lock.RLock()
status, err := Instance.Status(monitor.Task)
Lock.RUnlock()
status, err := monitor.node.GetAria2Instance().Status(monitor.Task)
if err != nil {
monitor.retried++
@@ -102,6 +104,7 @@ func (monitor *Monitor) Update() bool {
if err := monitor.UpdateTaskInfo(status); err != nil {
util.Log().Warning("无法更新下载任务[%s]的任务信息[%s]", monitor.Task.GID, err)
monitor.setErrorStatus(err)
monitor.RemoveTempFolder()
return true
}
@@ -115,7 +118,7 @@ func (monitor *Monitor) Update() bool {
case "active", "waiting", "paused":
return false
case "removed":
monitor.Task.Status = Canceled
monitor.Task.Status = common.Canceled
monitor.Task.Save()
monitor.RemoveTempFolder()
return true
@@ -130,7 +133,7 @@ func (monitor *Monitor) UpdateTaskInfo(status rpc.StatusInfo) error {
originSize := monitor.Task.TotalSize
monitor.Task.GID = status.Gid
monitor.Task.Status = getStatus(status.Status)
monitor.Task.Status = common.GetStatus(status.Status)
// 文件大小、已下载大小
total, err := strconv.ParseUint(status.TotalLength, 10, 64)
@@ -164,9 +167,7 @@ func (monitor *Monitor) UpdateTaskInfo(status rpc.StatusInfo) error {
// 文件大小更新后,对文件限制等进行校验
if err := monitor.ValidateFile(); err != nil {
// 验证失败时取消任务
Lock.RLock()
Instance.Cancel(monitor.Task)
Lock.RUnlock()
monitor.node.GetAria2Instance().Cancel(monitor.Task)
return err
}
}
@@ -179,7 +180,7 @@ func (monitor *Monitor) ValidateFile() error {
// 找到任务创建者
user := monitor.Task.GetOwner()
if user == nil {
return ErrUserNotFound
return common.ErrUserNotFound
}
// 创建文件系统
@@ -230,28 +231,31 @@ func (monitor *Monitor) Error(status rpc.StatusInfo) bool {
// RemoveTempFolder 清理下载临时目录
func (monitor *Monitor) RemoveTempFolder() {
err := os.RemoveAll(monitor.Task.Parent)
if err != nil {
util.Log().Warning("无法删除离线下载临时目录[%s], %s", monitor.Task.Parent, err)
}
monitor.node.GetAria2Instance().DeleteTempFile(monitor.Task)
}
// Complete 完成下载,返回是否中断监控
func (monitor *Monitor) Complete(status rpc.StatusInfo) bool {
// 创建中转任务
file := make([]string, 0, len(monitor.Task.StatusInfo.Files))
sizes := make(map[string]uint64, len(monitor.Task.StatusInfo.Files))
for i := 0; i < len(monitor.Task.StatusInfo.Files); i++ {
if monitor.Task.StatusInfo.Files[i].Selected == "true" {
file = append(file, monitor.Task.StatusInfo.Files[i].Path)
fileInfo := monitor.Task.StatusInfo.Files[i]
if fileInfo.Selected == "true" {
file = append(file, fileInfo.Path)
size, _ := strconv.ParseUint(fileInfo.Length, 10, 64)
sizes[fileInfo.Path] = size
}
}
job, err := task.NewTransferTask(
monitor.Task.UserID,
file,
monitor.Task.Dst,
monitor.Task.Parent,
true,
monitor.node.ID(),
sizes,
)
if err != nil {
monitor.setErrorStatus(err)
@@ -269,7 +273,7 @@ func (monitor *Monitor) Complete(status rpc.StatusInfo) bool {
}
func (monitor *Monitor) setErrorStatus(err error) {
monitor.Task.Status = Error
monitor.Task.Status = common.Error
monitor.Task.Error = err.Error()
monitor.Task.Save()
}

View File

@@ -1,4 +1,4 @@
package aria2
package monitor
import (
"errors"
@@ -7,6 +7,8 @@ import (
"github.com/DATA-DOG/go-sqlmock"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
@@ -44,13 +46,13 @@ func (m InstanceMock) Select(task *model.Download, files []int) error {
func TestNewMonitor(t *testing.T) {
asserts := assert.New(t)
NewMonitor(&model.Download{GID: "gid"})
_, ok := EventNotifier.Subscribes.Load("gid")
_, ok := common.EventNotifier.Subscribes.Load("gid")
asserts.True(ok)
}
func TestMonitor_Loop(t *testing.T) {
asserts := assert.New(t)
notifier := make(chan StatusEvent)
notifier := make(chan common.StatusEvent)
MAX_RETRY = 0
monitor := &Monitor{
Task: &model.Download{GID: "gid"},
@@ -76,10 +78,10 @@ func TestMonitor_Update(t *testing.T) {
{
MAX_RETRY = 1
testInstance := new(InstanceMock)
testInstance.On("Status", testMock.Anything).Return(rpc.StatusInfo{}, errors.New("error"))
testInstance.On("SlaveStatus", testMock.Anything).Return(rpc.StatusInfo{}, errors.New("error"))
file, _ := util.CreatNestedFile("TestMonitor_Update/1")
file.Close()
Instance = testInstance
aria2.Instance = testInstance
asserts.False(monitor.Update())
asserts.True(monitor.Update())
testInstance.AssertExpectations(t)
@@ -89,16 +91,16 @@ func TestMonitor_Update(t *testing.T) {
// 磁力链下载重定向
{
testInstance := new(InstanceMock)
testInstance.On("Status", testMock.Anything).Return(rpc.StatusInfo{
testInstance.On("SlaveStatus", testMock.Anything).Return(rpc.StatusInfo{
FollowedBy: []string{"1"},
}, nil)
monitor.Task.ID = 1
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
Instance = testInstance
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
aria2.Instance = testInstance
asserts.False(monitor.Update())
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
testInstance.AssertExpectations(t)
asserts.EqualValues("1", monitor.Task.GID)
}
@@ -106,82 +108,82 @@ func TestMonitor_Update(t *testing.T) {
// 无法更新任务信息
{
testInstance := new(InstanceMock)
testInstance.On("Status", testMock.Anything).Return(rpc.StatusInfo{}, nil)
testInstance.On("SlaveStatus", testMock.Anything).Return(rpc.StatusInfo{}, nil)
monitor.Task.ID = 1
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnError(errors.New("error"))
mock.ExpectRollback()
Instance = testInstance
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnError(errors.New("error"))
aria2.mock.ExpectRollback()
aria2.Instance = testInstance
asserts.True(monitor.Update())
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
testInstance.AssertExpectations(t)
}
// 返回未知状态
{
testInstance := new(InstanceMock)
testInstance.On("Status", testMock.Anything).Return(rpc.StatusInfo{Status: "?"}, nil)
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
Instance = testInstance
testInstance.On("SlaveStatus", testMock.Anything).Return(rpc.StatusInfo{Status: "?"}, nil)
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
aria2.Instance = testInstance
asserts.True(monitor.Update())
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
testInstance.AssertExpectations(t)
}
// 返回被取消状态
{
testInstance := new(InstanceMock)
testInstance.On("Status", testMock.Anything).Return(rpc.StatusInfo{Status: "removed"}, nil)
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
Instance = testInstance
testInstance.On("SlaveStatus", testMock.Anything).Return(rpc.StatusInfo{Status: "removed"}, nil)
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
aria2.Instance = testInstance
asserts.True(monitor.Update())
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
testInstance.AssertExpectations(t)
}
// 返回活跃状态
{
testInstance := new(InstanceMock)
testInstance.On("Status", testMock.Anything).Return(rpc.StatusInfo{Status: "active"}, nil)
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
Instance = testInstance
testInstance.On("SlaveStatus", testMock.Anything).Return(rpc.StatusInfo{Status: "active"}, nil)
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
aria2.Instance = testInstance
asserts.False(monitor.Update())
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
testInstance.AssertExpectations(t)
}
// 返回错误状态
{
testInstance := new(InstanceMock)
testInstance.On("Status", testMock.Anything).Return(rpc.StatusInfo{Status: "error"}, nil)
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
Instance = testInstance
testInstance.On("SlaveStatus", testMock.Anything).Return(rpc.StatusInfo{Status: "error"}, nil)
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
aria2.Instance = testInstance
asserts.True(monitor.Update())
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
testInstance.AssertExpectations(t)
}
// 返回完成
{
testInstance := new(InstanceMock)
testInstance.On("Status", testMock.Anything).Return(rpc.StatusInfo{Status: "complete"}, nil)
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
Instance = testInstance
testInstance.On("SlaveStatus", testMock.Anything).Return(rpc.StatusInfo{Status: "complete"}, nil)
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
aria2.Instance = testInstance
asserts.True(monitor.Update())
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
testInstance.AssertExpectations(t)
}
}
@@ -198,34 +200,34 @@ func TestMonitor_UpdateTaskInfo(t *testing.T) {
// 失败
{
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnError(errors.New("error"))
mock.ExpectRollback()
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnError(errors.New("error"))
aria2.mock.ExpectRollback()
err := monitor.UpdateTaskInfo(rpc.StatusInfo{})
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
asserts.Error(err)
}
// 更新成功,无需校验
{
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
err := monitor.UpdateTaskInfo(rpc.StatusInfo{})
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
asserts.NoError(err)
}
// 更新成功,大小改变,需要校验,校验失败
{
testInstance := new(InstanceMock)
testInstance.On("Cancel", testMock.Anything).Return(nil)
Instance = testInstance
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
testInstance.On("SlaveCancel", testMock.Anything).Return(nil)
aria2.Instance = testInstance
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
err := monitor.UpdateTaskInfo(rpc.StatusInfo{TotalLength: "1"})
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
asserts.Error(err)
testInstance.AssertExpectations(t)
}
@@ -308,17 +310,17 @@ func TestMonitor_Complete(t *testing.T) {
}
cache.Set("setting_max_worker_num", "1", 0)
mock.ExpectQuery("SELECT(.+)tasks").WillReturnRows(sqlmock.NewRows([]string{"id"}))
aria2.mock.ExpectQuery("SELECT(.+)tasks").WillReturnRows(sqlmock.NewRows([]string{"id"}))
task.Init()
mock.ExpectQuery("SELECT(.+)users").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
mock.ExpectQuery("SELECT(.+)policies").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)tasks").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
aria2.mock.ExpectQuery("SELECT(.+)users").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
aria2.mock.ExpectQuery("SELECT(.+)policies").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("INSERT(.+)tasks").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)downloads").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
aria2.mock.ExpectBegin()
aria2.mock.ExpectExec("UPDATE(.+)downloads").WillReturnResult(sqlmock.NewResult(1, 1))
aria2.mock.ExpectCommit()
asserts.True(monitor.Complete(rpc.StatusInfo{}))
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(aria2.mock.ExpectationsWereMet())
}

View File

@@ -1,64 +0,0 @@
package aria2
import (
"sync"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
)
// Notifier aria2实践通知处理
type Notifier struct {
Subscribes sync.Map
}
// Subscribe 订阅事件通知
func (notifier *Notifier) Subscribe(target chan StatusEvent, gid string) {
notifier.Subscribes.Store(gid, target)
}
// Unsubscribe 取消订阅事件通知
func (notifier *Notifier) Unsubscribe(gid string) {
notifier.Subscribes.Delete(gid)
}
// Notify 发送通知
func (notifier *Notifier) Notify(events []rpc.Event, status int) {
for _, event := range events {
if target, ok := notifier.Subscribes.Load(event.Gid); ok {
target.(chan StatusEvent) <- StatusEvent{
GID: event.Gid,
Status: status,
}
}
}
}
// OnDownloadStart 下载开始
func (notifier *Notifier) OnDownloadStart(events []rpc.Event) {
notifier.Notify(events, Downloading)
}
// OnDownloadPause 下载暂停
func (notifier *Notifier) OnDownloadPause(events []rpc.Event) {
notifier.Notify(events, Paused)
}
// OnDownloadStop 下载停止
func (notifier *Notifier) OnDownloadStop(events []rpc.Event) {
notifier.Notify(events, Canceled)
}
// OnDownloadComplete 下载完成
func (notifier *Notifier) OnDownloadComplete(events []rpc.Event) {
notifier.Notify(events, Complete)
}
// OnDownloadError 下载出错
func (notifier *Notifier) OnDownloadError(events []rpc.Event) {
notifier.Notify(events, Error)
}
// OnBtDownloadComplete BT下载完成
func (notifier *Notifier) OnBtDownloadComplete(events []rpc.Event) {
notifier.Notify(events, Complete)
}

View File

@@ -2,9 +2,11 @@ package auth
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"sort"
"strings"
"time"
@@ -30,9 +32,8 @@ type Auth interface {
Check(body string, sign string) error
}
// SignRequest 对PUT\POST等复杂HTTP请求签名如果请求Header中
// 包含 X-Policy 则此请求会被认定为上传请求只会对URI部分和
// Policy部分进行签名。其他请求则会对URI和Body部分进行签名。
// SignRequest 对PUT\POST等复杂HTTP请求签名只会对URI部分、
// 请求正文、`X-`开头的header进行签名
func SignRequest(instance Auth, r *http.Request, expires int64) *http.Request {
// 处理有效期
if expires > 0 {
@@ -61,20 +62,31 @@ func CheckRequest(instance Auth, r *http.Request) error {
return instance.Check(getSignContent(r), sign[0])
}
// getSignContent 根据请求Header中是否包含X-Policy判断是否为上传请求
// 返回待签名/验证的字符串
// getSignContent 签名请求 path、正文、以`X-`开头的 Header. 如果 Header 中包含 `X-Policy`
// 则不对正文签名。返回待签名/验证的字符串
func getSignContent(r *http.Request) (rawSignString string) {
if policy, ok := r.Header["X-Policy"]; ok {
rawSignString = serializer.NewRequestSignString(r.URL.Path, policy[0], "")
} else {
var body = []byte{}
// 读取所有body正文
var body = []byte{}
if _, ok := r.Header["X-Policy"]; !ok {
if r.Body != nil {
body, _ = ioutil.ReadAll(r.Body)
_ = r.Body.Close()
r.Body = ioutil.NopCloser(bytes.NewReader(body))
}
rawSignString = serializer.NewRequestSignString(r.URL.Path, "", string(body))
}
// 决定要签名的header
var signedHeader []string
for k, _ := range r.Header {
if strings.HasPrefix(k, "X-") && k != "X-Filename" {
signedHeader = append(signedHeader, fmt.Sprintf("%s=%s", k, r.Header.Get(k)))
}
}
sort.Strings(signedHeader)
// 读取所有待签名Header
rawSignString = serializer.NewRequestSignString(r.URL.Path, strings.Join(signedHeader, "&"), string(body))
return rawSignString
}

15
pkg/balancer/balancer.go Normal file
View File

@@ -0,0 +1,15 @@
package balancer
type Balancer interface {
NextPeer(nodes interface{}) (error, interface{})
}
// NewBalancer 根据策略标识返回新的负载均衡器
func NewBalancer(strategy string) Balancer {
switch strategy {
case "RoundRobin":
return &RoundRobin{}
default:
return &RoundRobin{}
}
}

8
pkg/balancer/errors.go Normal file
View File

@@ -0,0 +1,8 @@
package balancer
import "errors"
var (
ErrInputNotSlice = errors.New("Input value is not silice")
ErrNoAvaliableNode = errors.New("No nodes avaliable")
)

View File

@@ -0,0 +1,30 @@
package balancer
import (
"reflect"
"sync/atomic"
)
type RoundRobin struct {
current uint64
}
// NextPeer 返回轮盘的下一节点
func (r *RoundRobin) NextPeer(nodes interface{}) (error, interface{}) {
v := reflect.ValueOf(nodes)
if v.Kind() != reflect.Slice {
return ErrInputNotSlice, nil
}
if v.Len() == 0 {
return ErrNoAvaliableNode, nil
}
next := r.NextIndex(v.Len())
return nil, v.Index(next).Interface()
}
// NextIndex 返回下一个节点下标
func (r *RoundRobin) NextIndex(total int) int {
return int(atomic.AddUint64(&r.current, uint64(1)) % uint64(total))
}

8
pkg/cluster/errors.go Normal file
View File

@@ -0,0 +1,8 @@
package cluster
import "errors"
var (
ErrFeatureNotExist = errors.New("No nodes in nodepool match the feature specificed")
ErrIlegalPath = errors.New("path out of boundary of setting temp folder")
)

265
pkg/cluster/master.go Normal file
View File

@@ -0,0 +1,265 @@
package cluster
import (
"context"
"encoding/json"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"github.com/gofrs/uuid"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
const deleteTempFileDuration = 60 * time.Second
type MasterNode struct {
Model *model.Node
aria2RPC rpcService
lock sync.RWMutex
}
// RPCService 通过RPC服务的Aria2任务管理器
type rpcService struct {
Caller rpc.Client
Initialized bool
parent *MasterNode
options *clientOptions
}
type clientOptions struct {
Options map[string]interface{} // 创建下载时额外添加的设置
}
// Init 初始化节点
func (node *MasterNode) Init(nodeModel *model.Node) {
node.lock.Lock()
node.Model = nodeModel
node.aria2RPC.parent = node
node.lock.Unlock()
node.lock.RLock()
if node.Model.Aria2Enabled {
node.lock.RUnlock()
node.aria2RPC.Init()
return
}
node.lock.RUnlock()
}
func (node *MasterNode) ID() uint {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model.ID
}
func (node *MasterNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) {
return &serializer.NodePingResp{}, nil
}
// IsFeatureEnabled 查询节点的某项功能是否启用
func (node *MasterNode) IsFeatureEnabled(feature string) bool {
node.lock.RLock()
defer node.lock.RUnlock()
switch feature {
case "aria2":
return node.Model.Aria2Enabled
default:
return false
}
}
func (node *MasterNode) MasterAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)}
}
func (node *MasterNode) SlaveAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.SlaveKey)}
}
// SubscribeStatusChange 订阅节点状态更改
func (node *MasterNode) SubscribeStatusChange(callback func(isActive bool, id uint)) {
}
// IsActive 返回节点是否在线
func (node *MasterNode) IsActive() bool {
return true
}
// Kill 结束aria2请求
func (node *MasterNode) Kill() {
if node.aria2RPC.Caller != nil {
node.aria2RPC.Caller.Close()
}
}
// GetAria2Instance 获取主机Aria2实例
func (node *MasterNode) GetAria2Instance() common.Aria2 {
node.lock.RLock()
if !node.Model.Aria2Enabled {
node.lock.RUnlock()
return &common.DummyAria2{}
}
if !node.aria2RPC.Initialized {
node.lock.RUnlock()
node.aria2RPC.Init()
return &common.DummyAria2{}
}
defer node.lock.RUnlock()
return &node.aria2RPC
}
func (node *MasterNode) IsMater() bool {
return true
}
func (node *MasterNode) DBModel() *model.Node {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model
}
func (r *rpcService) Init() error {
r.parent.lock.Lock()
defer r.parent.lock.Unlock()
r.Initialized = false
// 客户端已存在,则关闭先前连接
if r.Caller != nil {
r.Caller.Close()
}
// 解析RPC服务地址
server, err := url.Parse(r.parent.Model.Aria2OptionsSerialized.Server)
if err != nil {
util.Log().Warning("无法解析主机 Aria2 RPC 服务地址,%s", err)
return err
}
server.Path = "/jsonrpc"
// 加载自定义下载配置
var globalOptions map[string]interface{}
if r.parent.Model.Aria2OptionsSerialized.Options != "" {
err = json.Unmarshal([]byte(r.parent.Model.Aria2OptionsSerialized.Options), &globalOptions)
if err != nil {
util.Log().Warning("无法解析主机 Aria2 配置,%s", err)
return err
}
}
r.options = &clientOptions{
Options: globalOptions,
}
timeout := r.parent.Model.Aria2OptionsSerialized.Timeout
caller, err := rpc.New(context.Background(), server.String(), r.parent.Model.Aria2OptionsSerialized.Token, time.Duration(timeout)*time.Second, mq.GlobalMQ)
r.Caller = caller
r.Initialized = err == nil
return err
}
func (r *rpcService) CreateTask(task *model.Download, groupOptions map[string]interface{}) (string, error) {
r.parent.lock.RLock()
// 生成存储路径
guid, _ := uuid.NewV4()
path := filepath.Join(
r.parent.Model.Aria2OptionsSerialized.TempPath,
"aria2",
guid.String(),
)
r.parent.lock.RUnlock()
// 创建下载任务
options := map[string]interface{}{
"dir": path,
}
for k, v := range r.options.Options {
options[k] = v
}
for k, v := range groupOptions {
options[k] = v
}
gid, err := r.Caller.AddURI(task.Source, options)
if err != nil || gid == "" {
return "", err
}
return gid, nil
}
func (r *rpcService) Status(task *model.Download) (rpc.StatusInfo, error) {
res, err := r.Caller.TellStatus(task.GID)
if err != nil {
// 失败后重试
util.Log().Debug("无法获取离线下载状态,%s10秒钟后重试", err)
time.Sleep(time.Duration(10) * time.Second)
res, err = r.Caller.TellStatus(task.GID)
}
return res, err
}
func (r *rpcService) Cancel(task *model.Download) error {
// 取消下载任务
_, err := r.Caller.Remove(task.GID)
if err != nil {
util.Log().Warning("无法取消离线下载任务[%s], %s", task.GID, err)
}
return err
}
func (r *rpcService) Select(task *model.Download, files []int) error {
var selected = make([]string, len(files))
for i := 0; i < len(files); i++ {
selected[i] = strconv.Itoa(files[i])
}
_, err := r.Caller.ChangeOption(task.GID, map[string]interface{}{"select-file": strings.Join(selected, ",")})
return err
}
func (r *rpcService) GetConfig() model.Aria2Option {
r.parent.lock.RLock()
defer r.parent.lock.RUnlock()
return r.parent.Model.Aria2OptionsSerialized
}
func (s *rpcService) DeleteTempFile(task *model.Download) error {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
// 避免被aria2占用异步执行删除
go func(src string) {
time.Sleep(deleteTempFileDuration)
err := os.RemoveAll(src)
if err != nil {
util.Log().Warning("无法删除离线下载临时目录[%s], %s", src, err)
}
}(task.Parent)
return nil
}

60
pkg/cluster/node.go Normal file
View File

@@ -0,0 +1,60 @@
package cluster
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
)
type Node interface {
// Init a node from database model
Init(node *model.Node)
// Check if given feature is enabled
IsFeatureEnabled(feature string) bool
// Subscribe node status change to a callback function
SubscribeStatusChange(callback func(isActive bool, id uint))
// Ping the node
Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error)
// Returns if the node is active
IsActive() bool
// Get instances for aria2 calls
GetAria2Instance() common.Aria2
// Returns unique id of this node
ID() uint
// Kill node and recycle resources
Kill()
// Returns if current node is master node
IsMater() bool
// Get auth instance used to check RPC call from slave to master
MasterAuthInstance() auth.Auth
// Get auth instance used to check RPC call from master to slave
SlaveAuthInstance() auth.Auth
// Get node DB model
DBModel() *model.Node
}
// Create new node from DB model
func NewNodeFromDBModel(node *model.Node) Node {
switch node.Type {
case model.SlaveNodeType:
slave := &SlaveNode{}
slave.Init(node)
return slave
default:
master := &MasterNode{}
master.Init(node)
return master
}
}

176
pkg/cluster/pool.go Normal file
View File

@@ -0,0 +1,176 @@
package cluster
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/balancer"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"sync"
)
var Default *NodePool
// 需要分类的节点组
var featureGroup = []string{"aria2"}
// Pool 节点池
type Pool interface {
// Returns active node selected by given feature and load balancer
BalanceNodeByFeature(feature string, lb balancer.Balancer) (error, Node)
// Returns node by ID
GetNodeByID(id uint) Node
// Add given node into pool. If node existed, refresh node.
Add(node *model.Node)
// Delete and kill node from pool by given node id
Delete(id uint)
}
// NodePool 通用节点池
type NodePool struct {
active map[uint]Node
inactive map[uint]Node
featureMap map[string][]Node
lock sync.RWMutex
}
// Init 初始化从机节点池
func Init() {
Default = &NodePool{
featureMap: make(map[string][]Node),
}
if err := Default.initFromDB(); err != nil {
util.Log().Warning("节点池初始化失败, %s", err)
}
}
func (pool *NodePool) buildIndexMap() {
pool.lock.Lock()
for _, feature := range featureGroup {
pool.featureMap[feature] = make([]Node, 0)
}
for _, v := range pool.active {
for _, feature := range featureGroup {
if v.IsFeatureEnabled(feature) {
pool.featureMap[feature] = append(pool.featureMap[feature], v)
}
}
}
pool.lock.Unlock()
}
func (pool *NodePool) GetNodeByID(id uint) Node {
pool.lock.RLock()
defer pool.lock.RUnlock()
if node, ok := pool.active[id]; ok {
return node
}
return pool.inactive[id]
}
func (pool *NodePool) nodeStatusChange(isActive bool, id uint) {
util.Log().Debug("从机节点 [ID=%d] 状态变更 [Active=%t]", id, isActive)
pool.lock.Lock()
if isActive {
node := pool.inactive[id]
delete(pool.inactive, id)
pool.active[id] = node
} else {
node := pool.active[id]
delete(pool.active, id)
pool.inactive[id] = node
}
pool.lock.Unlock()
pool.buildIndexMap()
}
func (pool *NodePool) initFromDB() error {
nodes, err := model.GetNodesByStatus(model.NodeActive)
if err != nil {
return err
}
pool.lock.Lock()
pool.active = make(map[uint]Node)
pool.inactive = make(map[uint]Node)
for i := 0; i < len(nodes); i++ {
pool.add(&nodes[i])
}
pool.lock.Unlock()
pool.buildIndexMap()
return nil
}
func (pool *NodePool) add(node *model.Node) {
newNode := NewNodeFromDBModel(node)
if newNode.IsActive() {
pool.active[node.ID] = newNode
} else {
pool.inactive[node.ID] = newNode
}
// 订阅节点状态变更
newNode.SubscribeStatusChange(func(isActive bool, id uint) {
pool.nodeStatusChange(isActive, id)
})
}
func (pool *NodePool) Add(node *model.Node) {
pool.lock.Lock()
defer pool.buildIndexMap()
defer pool.lock.Unlock()
if _, ok := pool.active[node.ID]; ok {
// TODO: refresh node
return
}
if _, ok := pool.inactive[node.ID]; ok {
return
}
pool.add(node)
}
func (pool *NodePool) Delete(id uint) {
pool.lock.Lock()
defer pool.buildIndexMap()
defer pool.lock.Unlock()
if node, ok := pool.active[id]; ok {
node.Kill()
delete(pool.active, id)
return
}
if node, ok := pool.inactive[id]; ok {
node.Kill()
delete(pool.inactive, id)
return
}
}
// BalanceNodeByFeature 根据 feature 和 LoadBalancer 取出节点
func (pool *NodePool) BalanceNodeByFeature(feature string, lb balancer.Balancer) (error, Node) {
pool.lock.RLock()
defer pool.lock.RUnlock()
if nodes, ok := pool.featureMap[feature]; ok {
err, res := lb.NextPeer(nodes)
if err == nil {
return nil, res.(Node)
}
return err, nil
}
return ErrFeatureNotExist, nil
}

405
pkg/cluster/slave.go Normal file
View File

@@ -0,0 +1,405 @@
package cluster
import (
"encoding/json"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"io"
"net/url"
"strings"
"sync"
"time"
)
type SlaveNode struct {
Model *model.Node
Active bool
caller slaveCaller
callback func(bool, uint)
close chan bool
lock sync.RWMutex
}
type slaveCaller struct {
parent *SlaveNode
Client request.Client
}
// Init 初始化节点
func (node *SlaveNode) Init(nodeModel *model.Node) {
node.lock.Lock()
defer node.lock.Unlock()
node.Model = nodeModel
// Init http request client
var endpoint *url.URL
if serverURL, err := url.Parse(node.Model.Server); err == nil {
var controller *url.URL
controller, _ = url.Parse("/api/v3/slave")
endpoint = serverURL.ResolveReference(controller)
}
signTTL := model.GetIntSetting("slave_api_timeout", 60)
node.caller.Client = request.NewClient(
request.WithMasterMeta(),
request.WithTimeout(time.Duration(signTTL)*time.Second),
request.WithCredential(auth.HMACAuth{SecretKey: []byte(nodeModel.SlaveKey)}, int64(signTTL)),
request.WithEndpoint(endpoint.String()),
)
node.caller.parent = node
node.Active = true
if node.close != nil {
node.close <- true
}
go node.StartPingLoop()
}
// IsFeatureEnabled 查询节点的某项功能是否启用
func (node *SlaveNode) IsFeatureEnabled(feature string) bool {
node.lock.RLock()
defer node.lock.RUnlock()
switch feature {
case "aria2":
return node.Model.Aria2Enabled
default:
return false
}
}
// SubscribeStatusChange 订阅节点状态更改
func (node *SlaveNode) SubscribeStatusChange(callback func(bool, uint)) {
node.lock.Lock()
node.callback = callback
node.lock.Unlock()
}
// Ping 从机节点,返回从机负载
func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) {
reqBodyEncoded, err := json.Marshal(req)
if err != nil {
return nil, err
}
bodyReader := strings.NewReader(string(reqBodyEncoded))
resp, err := node.caller.Client.Request(
"POST",
"heartbeat",
bodyReader,
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return nil, err
}
// 处理列取结果
if resp.Code != 0 {
return nil, serializer.NewErrorFromResponse(resp)
}
var res serializer.NodePingResp
if resStr, ok := resp.Data.(string); ok {
err = json.Unmarshal([]byte(resStr), &res)
if err != nil {
return nil, err
}
}
return &res, nil
}
// IsActive 返回节点是否在线
func (node *SlaveNode) IsActive() bool {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Active
}
// Kill 结束节点内相关循环
func (node *SlaveNode) Kill() {
node.lock.RLock()
defer node.lock.RUnlock()
if node.close != nil {
close(node.close)
}
}
// GetAria2Instance 获取从机Aria2实例
func (node *SlaveNode) GetAria2Instance() common.Aria2 {
node.lock.RLock()
defer node.lock.RUnlock()
if !node.Model.Aria2Enabled {
return &common.DummyAria2{}
}
return &node.caller
}
func (node *SlaveNode) ID() uint {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model.ID
}
func (node *SlaveNode) StartPingLoop() {
node.lock.Lock()
node.close = make(chan bool)
node.lock.Unlock()
tickDuration := time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second
recoverDuration := time.Duration(model.GetIntSetting("slave_recover_interval", 600)) * time.Second
pingTicker := time.Duration(0)
util.Log().Debug("从机节点 [%s] 启动心跳循环", node.Model.Name)
retry := 0
recoverMode := false
isFirstLoop := true
loop:
for {
select {
case <-time.After(pingTicker):
if pingTicker == 0 {
pingTicker = tickDuration
}
util.Log().Debug("从机节点 [%s] 发送Ping", node.Model.Name)
res, err := node.Ping(node.getHeartbeatContent(isFirstLoop))
isFirstLoop = false
if err != nil {
util.Log().Debug("Ping从机节点 [%s] 时发生错误: %s", node.Model.Name, err)
retry++
if retry >= model.GetIntSetting("slave_node_retry", 3) {
util.Log().Debug("从机节点 [%s] Ping 重试已达到最大限制,将从机节点标记为不可用", node.Model.Name)
node.changeStatus(false)
if !recoverMode {
// 启动恢复监控循环
util.Log().Debug("从机节点 [%s] 进入恢复模式", node.Model.Name)
pingTicker = recoverDuration
recoverMode = true
}
}
} else {
if recoverMode {
util.Log().Debug("从机节点 [%s] 复活", node.Model.Name)
pingTicker = tickDuration
recoverMode = false
isFirstLoop = true
}
util.Log().Debug("从机节点 [%s] 状态: %s", node.Model.Name, res)
node.changeStatus(true)
retry = 0
}
case <-node.close:
util.Log().Debug("从机节点 [%s] 收到关闭信号", node.Model.Name)
break loop
}
}
}
func (node *SlaveNode) IsMater() bool {
return false
}
func (node *SlaveNode) MasterAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)}
}
func (node *SlaveNode) SlaveAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.SlaveKey)}
}
func (node *SlaveNode) DBModel() *model.Node {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model
}
// getHeartbeatContent gets serializer.NodePingReq used to send heartbeat to slave
func (node *SlaveNode) getHeartbeatContent(isUpdate bool) *serializer.NodePingReq {
return &serializer.NodePingReq{
SiteURL: model.GetSiteURL().String(),
IsUpdate: isUpdate,
SiteID: model.GetSettingByName("siteID"),
Node: node.Model,
CredentialTTL: model.GetIntSetting("slave_api_timeout", 60),
}
}
func (node *SlaveNode) changeStatus(isActive bool) {
node.lock.RLock()
id := node.Model.ID
if isActive != node.Active {
node.lock.RUnlock()
node.lock.Lock()
node.Active = isActive
node.lock.Unlock()
node.callback(isActive, id)
} else {
node.lock.RUnlock()
}
}
func (s *slaveCaller) Init() error {
return nil
}
// SendAria2Call send remote aria2 call to slave node
func (s *slaveCaller) SendAria2Call(body *serializer.SlaveAria2Call, scope string) (*serializer.Response, error) {
reqReader, err := getAria2RequestBody(body)
if err != nil {
return nil, err
}
return s.Client.Request(
"POST",
"aria2/"+scope,
reqReader,
).CheckHTTPResponse(200).DecodeResponse()
}
func (s *slaveCaller) CreateTask(task *model.Download, options map[string]interface{}) (string, error) {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
req := &serializer.SlaveAria2Call{
Task: task,
GroupOptions: options,
}
res, err := s.SendAria2Call(req, "task")
if err != nil {
return "", err
}
if res.Code != 0 {
return "", serializer.NewErrorFromResponse(res)
}
return res.Data.(string), err
}
func (s *slaveCaller) Status(task *model.Download) (rpc.StatusInfo, error) {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
req := &serializer.SlaveAria2Call{
Task: task,
}
res, err := s.SendAria2Call(req, "status")
if err != nil {
return rpc.StatusInfo{}, err
}
if res.Code != 0 {
return rpc.StatusInfo{}, serializer.NewErrorFromResponse(res)
}
var status rpc.StatusInfo
res.GobDecode(&status)
return status, err
}
func (s *slaveCaller) Cancel(task *model.Download) error {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
req := &serializer.SlaveAria2Call{
Task: task,
}
res, err := s.SendAria2Call(req, "cancel")
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
return nil
}
func (s *slaveCaller) Select(task *model.Download, files []int) error {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
req := &serializer.SlaveAria2Call{
Task: task,
Files: files,
}
res, err := s.SendAria2Call(req, "select")
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
return nil
}
func (s *slaveCaller) GetConfig() model.Aria2Option {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
return s.parent.Model.Aria2OptionsSerialized
}
func (s *slaveCaller) DeleteTempFile(task *model.Download) error {
s.parent.lock.RLock()
defer s.parent.lock.RUnlock()
req := &serializer.SlaveAria2Call{
Task: task,
}
res, err := s.SendAria2Call(req, "delete")
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
return nil
}
func getAria2RequestBody(body *serializer.SlaveAria2Call) (io.Reader, error) {
reqBodyEncoded, err := json.Marshal(body)
if err != nil {
return nil, err
}
return strings.NewReader(string(reqBodyEncoded)), nil
}

View File

@@ -0,0 +1,39 @@
package driver
import (
"context"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"io"
"net/url"
)
// Handler 存储策略适配器
type Handler interface {
// 上传文件, dst为文件存储路径size 为文件大小。上下文关闭
// 时,应取消上传并清理临时文件
Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error
// 删除一个或多个给定路径的文件,返回删除失败的文件路径列表及错误
Delete(ctx context.Context, files []string) ([]string, error)
// 获取文件内容
Get(ctx context.Context, path string) (response.RSCloser, error)
// 获取缩略图可直接在ContentResponse中返回文件数据流也可指
// 定为重定向
Thumb(ctx context.Context, path string) (*response.ContentResponse, error)
// 获取外链/下载地址,
// url - 站点本身地址,
// isDownload - 是否直接下载
Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error)
// Token 获取有效期为ttl的上传凭证和签名同时回调会话有效期为sessionTTL
Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error)
// List 递归列取远程端path路径下文件、目录不包含path本身
// 返回的对象路径以path作为起始根目录.
// recursive - 是否递归列出
List(ctx context.Context, path string, recursive bool) ([]response.Object, error)
}

View File

@@ -55,7 +55,7 @@ func NewClient(policy *model.Policy) (*Client, error) {
ClientID: policy.BucketName,
ClientSecret: policy.SecretKey,
Redirect: policy.OptionsSerialized.OdRedirect,
Request: request.HTTPClient{},
Request: request.NewClient(),
}
if client.Endpoints.DriverResource == "" {

View File

@@ -14,6 +14,7 @@ import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
@@ -27,6 +28,16 @@ type Driver struct {
HTTPClient request.Client
}
// NewDriver 从存储策略初始化新的Driver实例
func NewDriver(policy *model.Policy) (driver.Handler, error) {
client, err := NewClient(policy)
return Driver{
Policy: policy,
Client: client,
HTTPClient: request.NewClient(),
}, err
}
// List 列取项目
func (handler Driver) List(ctx context.Context, base string, recursive bool) ([]response.Object, error) {
base = strings.TrimPrefix(base, "/")

View File

@@ -0,0 +1,25 @@
package onedrive
import "sync"
// CredentialLock 针对存储策略凭证的锁
type CredentialLock interface {
Lock(uint)
Unlock(uint)
}
var GlobalMutex = mutexMap{}
type mutexMap struct {
locks sync.Map
}
func (m *mutexMap) Lock(id uint) {
lock, _ := m.locks.LoadOrStore(id, &sync.Mutex{})
lock.(*sync.Mutex).Lock()
}
func (m *mutexMap) Unlock(id uint) {
lock, _ := m.locks.LoadOrStore(id, &sync.Mutex{})
lock.(*sync.Mutex).Unlock()
}

View File

@@ -10,7 +10,9 @@ import (
"time"
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/slave"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
@@ -124,6 +126,13 @@ func (client *Client) ObtainToken(ctx context.Context, opts ...Option) (*Credent
// UpdateCredential 更新凭证,并检查有效期
func (client *Client) UpdateCredential(ctx context.Context) error {
if conf.SystemConfig.Mode == "slave" {
return client.fetchCredentialFromMaster(ctx)
}
GlobalMutex.Lock(client.Policy.ID)
defer GlobalMutex.Unlock(client.Policy.ID)
// 如果已存在凭证
if client.Credential != nil && client.Credential.AccessToken != "" {
// 检查已有凭证是否过期
@@ -160,11 +169,21 @@ func (client *Client) UpdateCredential(ctx context.Context) error {
client.Credential = credential
// 更新存储策略的 RefreshToken
client.Policy.AccessKey = credential.RefreshToken
client.Policy.SaveAndClearCache()
client.Policy.UpdateAccessKeyAndClearCache(credential.RefreshToken)
// 更新缓存
cache.Set("onedrive_"+client.ClientID, *credential, int(expires))
return nil
}
// UpdateCredential 更新凭证,并检查有效期
func (client *Client) fetchCredentialFromMaster(ctx context.Context) error {
res, err := slave.DefaultController.GetOneDriveToken(client.Policy.MasterID, client.Policy.ID)
if err != nil {
return err
}
client.Credential = &Credential{AccessToken: res}
return nil
}

View File

@@ -42,7 +42,7 @@ func GetPublicKey(r *http.Request) ([]byte, error) {
}
// 获取公钥
client := request.HTTPClient{}
client := request.NewClient()
body, err := client.Request("GET", string(pubURL), nil).
CheckHTTPResponse(200).
GetResponse()

View File

@@ -292,7 +292,7 @@ func TestDriver_Get(t *testing.T) {
BucketName: "test",
Server: "oss-cn-shanghai.aliyuncs.com",
},
HTTPClient: request.HTTPClient{},
HTTPClient: request.NewClient(),
}
cache.Set("setting_preview_timeout", "3600", 0)

View File

@@ -49,6 +49,7 @@ func (handler Driver) List(ctx context.Context, path string, recursive bool) ([]
handler.getAPIUrl("list"),
bodyReader,
request.WithCredential(handler.AuthInstance, int64(signTTL)),
request.WithMasterMeta(),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return res, err
@@ -97,7 +98,7 @@ func (handler Driver) getAPIUrl(scope string, routes ...string) string {
// Get 获取文件内容
func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
// 尝试获取速度限制 TODO 是否需要在这里限制?
// 尝试获取速度限制
speedLimit := 0
if user, ok := ctx.Value(fsctx.UserCtx).(model.User); ok {
speedLimit = user.Group.SpeedLimit
@@ -116,6 +117,7 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
nil,
request.WithContext(ctx),
request.WithTimeout(time.Duration(0)),
request.WithMasterMeta(),
).CheckHTTPResponse(200).GetRSCloser()
if err != nil {
return nil, err
@@ -168,13 +170,15 @@ func (handler Driver) Put(ctx context.Context, file io.ReadCloser, dst string, s
handler.Policy.GetUploadURL(),
file,
request.WithHeader(map[string][]string{
"Authorization": {credential.Token},
"X-Policy": {credential.Policy},
"X-FileName": {fileName},
"X-Overwrite": {overwrite},
"X-Policy": {credential.Policy},
"X-FileName": {fileName},
"X-Overwrite": {overwrite},
}),
request.WithContentLength(int64(size)),
request.WithTimeout(time.Duration(0)),
request.WithMasterMeta(),
request.WithSlaveMeta(handler.Policy.AccessKey),
request.WithCredential(handler.AuthInstance, int64(credentialTTL)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
@@ -206,6 +210,8 @@ func (handler Driver) Delete(ctx context.Context, files []string) ([]string, err
handler.getAPIUrl("delete"),
bodyReader,
request.WithCredential(handler.AuthInstance, int64(signTTL)),
request.WithMasterMeta(),
request.WithSlaveMeta(handler.Policy.AccessKey),
).CheckHTTPResponse(200).GetResponse()
if err != nil {
return files, err

View File

@@ -172,7 +172,7 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
}
// 获取文件数据流
client := request.HTTPClient{}
client := request.NewClient()
resp, err := client.Request(
"GET",
downloadURL,

View File

@@ -0,0 +1,7 @@
package masterinslave
import "errors"
var (
ErrNotImplemented = errors.New("this method of shadowed policy is not implemented")
)

View File

@@ -0,0 +1,56 @@
package masterinslave
import (
"context"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"io"
"net/url"
)
// Driver 影子存储策略,用于在从机端上传文件
type Driver struct {
master cluster.Node
handler driver.Handler
policy *model.Policy
}
// NewDriver 返回新的处理器
func NewDriver(master cluster.Node, handler driver.Handler, policy *model.Policy) driver.Handler {
return &Driver{
master: master,
handler: handler,
policy: policy,
}
}
func (d *Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
return d.handler.Put(ctx, file, dst, size)
}
func (d *Driver) Delete(ctx context.Context, files []string) ([]string, error) {
return d.handler.Delete(ctx, files)
}
func (d *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
return nil, ErrNotImplemented
}
func (d *Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
return nil, ErrNotImplemented
}
func (d *Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) {
return "", ErrNotImplemented
}
func (d *Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) {
return serializer.UploadCredential{}, ErrNotImplemented
}
func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
return nil, ErrNotImplemented
}

View File

@@ -0,0 +1,9 @@
package slaveinmaster
import "errors"
var (
ErrNotImplemented = errors.New("this method of shadowed policy is not implemented")
ErrSlaveSrcPathNotExist = errors.New("cannot determine source file path in slave node")
ErrWaitResultTimeout = errors.New("timeout waiting for slave transfer result")
)

View File

@@ -0,0 +1,121 @@
package slaveinmaster
import (
"bytes"
"context"
"encoding/json"
"errors"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"io"
"net/url"
"time"
)
// Driver 影子存储策略,将上传任务指派给从机节点处理,并等待从机通知上传结果
type Driver struct {
node cluster.Node
handler driver.Handler
policy *model.Policy
client request.Client
}
// NewDriver 返回新的从机指派处理器
func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy) driver.Handler {
var endpoint *url.URL
if serverURL, err := url.Parse(node.DBModel().Server); err == nil {
var controller *url.URL
controller, _ = url.Parse("/api/v3/slave")
endpoint = serverURL.ResolveReference(controller)
}
signTTL := model.GetIntSetting("slave_api_timeout", 60)
return &Driver{
node: node,
handler: handler,
policy: policy,
client: request.NewClient(
request.WithMasterMeta(),
request.WithTimeout(time.Duration(signTTL)*time.Second),
request.WithCredential(node.SlaveAuthInstance(), int64(signTTL)),
request.WithEndpoint(endpoint.String()),
),
}
}
// Put 将ctx中指定的从机物理文件由从机上传到目标存储策略
func (d *Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
src, ok := ctx.Value(fsctx.SlaveSrcPath).(string)
if !ok {
return ErrSlaveSrcPathNotExist
}
req := serializer.SlaveTransferReq{
Src: src,
Dst: dst,
Policy: d.policy,
}
body, err := json.Marshal(req)
if err != nil {
return err
}
// 订阅转存结果
resChan := mq.GlobalMQ.Subscribe(req.Hash(model.GetSettingByName("siteID")), 0)
defer mq.GlobalMQ.Unsubscribe(req.Hash(model.GetSettingByName("siteID")), resChan)
res, err := d.client.Request("PUT", "task/transfer", bytes.NewReader(body)).
CheckHTTPResponse(200).
DecodeResponse()
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
// 等待转存结果或者超时
waitTimeout := model.GetIntSetting("slave_transfer_timeout", 172800)
select {
case <-time.After(time.Duration(waitTimeout) * time.Second):
return ErrWaitResultTimeout
case msg := <-resChan:
if msg.Event != serializer.SlaveTransferSuccess {
return errors.New(msg.Content.(serializer.SlaveTransferResult).Error)
}
}
return nil
}
func (d *Driver) Delete(ctx context.Context, files []string) ([]string, error) {
return d.handler.Delete(ctx, files)
}
func (d *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
return nil, ErrNotImplemented
}
func (d *Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
return nil, ErrNotImplemented
}
func (d *Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) {
return "", ErrNotImplemented
}
func (d *Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) {
return serializer.UploadCredential{}, ErrNotImplemented
}
func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
return nil, ErrNotImplemented
}

View File

@@ -1,8 +1,12 @@
package filesystem
import (
"context"
"errors"
"fmt"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/masterinslave"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/slaveinmaster"
"io"
"net/http"
"net/url"
@@ -19,7 +23,6 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/remote"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/s3"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/upyun"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/gin-gonic/gin"
@@ -43,36 +46,6 @@ type FileHeader interface {
GetVirtualPath() string
}
// Handler 存储策略适配器
type Handler interface {
// 上传文件, dst为文件存储路径size 为文件大小。上下文关闭
// 时,应取消上传并清理临时文件
Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error
// 删除一个或多个给定路径的文件,返回删除失败的文件路径列表及错误
Delete(ctx context.Context, files []string) ([]string, error)
// 获取文件内容
Get(ctx context.Context, path string) (response.RSCloser, error)
// 获取缩略图可直接在ContentResponse中返回文件数据流也可指
// 定为重定向
Thumb(ctx context.Context, path string) (*response.ContentResponse, error)
// 获取外链/下载地址,
// url - 站点本身地址,
// isDownload - 是否直接下载
Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error)
// Token 获取有效期为ttl的上传凭证和签名同时回调会话有效期为sessionTTL
Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error)
// List 递归列取远程端path路径下文件、目录不包含path本身
// 返回的对象路径以path作为起始根目录.
// recursive - 是否递归列出
List(ctx context.Context, path string, recursive bool) ([]response.Object, error)
}
// FileSystem 管理文件的文件系统
type FileSystem struct {
// 文件系统所有者
@@ -96,7 +69,7 @@ type FileSystem struct {
/*
文件系统处理适配器
*/
Handler Handler
Handler driver.Handler
// 回收锁
recycleLock sync.Mutex
@@ -134,7 +107,6 @@ func NewFileSystem(user *model.User) (*FileSystem, error) {
// 分配存储策略适配器
err := fs.DispatchHandler()
// TODO 分配默认钩子
return fs, err
}
@@ -159,7 +131,6 @@ func NewAnonymousFileSystem() (*FileSystem, error) {
}
// DispatchHandler 根据存储策略分配文件适配器
// TODO 完善测试
func (fs *FileSystem) DispatchHandler() error {
var policyType string
var currentPolicy *model.Policy
@@ -184,7 +155,7 @@ func (fs *FileSystem) DispatchHandler() error {
case "remote":
fs.Handler = remote.Driver{
Policy: currentPolicy,
Client: request.HTTPClient{},
Client: request.NewClient(),
AuthInstance: auth.HMACAuth{[]byte(currentPolicy.SecretKey)},
}
return nil
@@ -196,7 +167,7 @@ func (fs *FileSystem) DispatchHandler() error {
case "oss":
fs.Handler = oss.Driver{
Policy: currentPolicy,
HTTPClient: request.HTTPClient{},
HTTPClient: request.NewClient(),
}
return nil
case "upyun":
@@ -205,13 +176,9 @@ func (fs *FileSystem) DispatchHandler() error {
}
return nil
case "onedrive":
client, err := onedrive.NewClient(currentPolicy)
fs.Handler = onedrive.Driver{
Policy: currentPolicy,
Client: client,
HTTPClient: request.HTTPClient{},
}
return err
var odErr error
fs.Handler, odErr = onedrive.NewDriver(currentPolicy)
return odErr
case "cos":
u, _ := url.Parse(currentPolicy.Server)
b := &cossdk.BaseURL{BucketURL: u}
@@ -223,7 +190,7 @@ func (fs *FileSystem) DispatchHandler() error {
SecretKey: currentPolicy.SecretKey,
},
}),
HTTPClient: request.HTTPClient{},
HTTPClient: request.NewClient(),
}
return nil
case "s3":
@@ -272,6 +239,30 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) {
return fs, err
}
// SwitchToSlaveHandler 将负责上传的 Handler 切换为从机节点
func (fs *FileSystem) SwitchToSlaveHandler(node cluster.Node) {
fs.Handler = slaveinmaster.NewDriver(node, fs.Handler, &fs.User.Policy)
}
// SwitchToShadowHandler 将负责上传的 Handler 切换为从机节点转存使用的影子处理器
func (fs *FileSystem) SwitchToShadowHandler(master cluster.Node, masterURL, masterID string) {
switch fs.Policy.Type {
case "remote":
fs.Policy.Type = "local"
fs.DispatchHandler()
case "local":
fs.Policy.Type = "remote"
fs.Policy.Server = masterURL
fs.Policy.AccessKey = fmt.Sprintf("%d", master.ID())
fs.Policy.SecretKey = master.DBModel().MasterKey
fs.DispatchHandler()
case "onedrive":
fs.Policy.MasterID = masterID
}
fs.Handler = masterinslave.NewDriver(master, fs.Handler, fs.Policy)
}
// SetTargetFile 设置当前处理的目标文件
func (fs *FileSystem) SetTargetFile(files *[]model.File) {
if len(fs.FileTarget) == 0 {

View File

@@ -41,4 +41,6 @@ const (
ValidateCapacityOnceCtx
// 禁止上传时同名覆盖操作
DisableOverwrite
// 文件在从机节点中的路径
SlaveSrcPath
)

View File

@@ -228,12 +228,14 @@ func (fs *FileSystem) UploadFromStream(ctx context.Context, src io.ReadCloser, d
}
// UploadFromPath 将本机已有文件上传到用户的文件系统
func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string) error {
func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, resetPolicy bool) error {
// 重设存储策略
fs.Policy = &fs.User.Policy
err := fs.DispatchHandler()
if err != nil {
return err
if resetPolicy {
fs.Policy = &fs.User.Policy
err := fs.DispatchHandler()
if err != nil {
return err
}
}
file, err := os.Open(util.RelativePath(src))

View File

@@ -226,13 +226,13 @@ func TestFileSystem_UploadFromPath(t *testing.T) {
// 文件不存在
{
err := fs.UploadFromPath(ctx, "test/not_exist", "/")
err := fs.UploadFromPath(ctx, "test/not_exist", "/", true)
asserts.Error(err)
}
// 文存在,上传失败
{
err := fs.UploadFromPath(ctx, "tests/test.zip", "/")
err := fs.UploadFromPath(ctx, "tests/test.zip", "/", true)
asserts.Error(err)
}
}

160
pkg/mq/mq.go Normal file
View File

@@ -0,0 +1,160 @@
package mq
import (
"encoding/gob"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"strconv"
"sync"
"time"
)
// Message 消息事件正文
type Message struct {
// 消息触发者
TriggeredBy string
// 事件标识
Event string
// 消息正文
Content interface{}
}
type CallbackFunc func(Message)
// MQ 消息队列
type MQ interface {
rpc.Notifier
// 发布一个消息
Publish(string, Message)
// 订阅一个消息主题
Subscribe(string, int) <-chan Message
// 订阅一个消息主题,注册触发回调函数
SubscribeCallback(string, CallbackFunc)
// 取消订阅一个消息主题
Unsubscribe(string, <-chan Message)
}
var GlobalMQ = NewMQ()
func NewMQ() MQ {
return &inMemoryMQ{
topics: make(map[string][]chan Message),
callbacks: make(map[string][]CallbackFunc),
}
}
func init() {
gob.Register(Message{})
gob.Register([]rpc.Event{})
}
type inMemoryMQ struct {
topics map[string][]chan Message
callbacks map[string][]CallbackFunc
sync.RWMutex
}
func (i *inMemoryMQ) Publish(topic string, message Message) {
i.RLock()
subscribersChan, okChan := i.topics[topic]
subscribersCallback, okCallback := i.callbacks[topic]
i.RUnlock()
if okChan {
go func(subscribersChan []chan Message) {
for i := 0; i < len(subscribersChan); i++ {
select {
case subscribersChan[i] <- message:
case <-time.After(time.Millisecond * 500):
}
}
}(subscribersChan)
}
if okCallback {
for i := 0; i < len(subscribersCallback); i++ {
go subscribersCallback[i](message)
}
}
}
func (i *inMemoryMQ) Subscribe(topic string, buffer int) <-chan Message {
ch := make(chan Message, buffer)
i.Lock()
i.topics[topic] = append(i.topics[topic], ch)
i.Unlock()
return ch
}
func (i *inMemoryMQ) SubscribeCallback(topic string, callbackFunc CallbackFunc) {
i.Lock()
i.callbacks[topic] = append(i.callbacks[topic], callbackFunc)
i.Unlock()
}
func (i *inMemoryMQ) Unsubscribe(topic string, sub <-chan Message) {
i.Lock()
defer i.Unlock()
subscribers, ok := i.topics[topic]
if !ok {
return
}
var newSubs []chan Message
for _, subscriber := range subscribers {
if subscriber == sub {
continue
}
newSubs = append(newSubs, subscriber)
}
i.topics[topic] = newSubs
}
func (i *inMemoryMQ) Aria2Notify(events []rpc.Event, status int) {
for _, event := range events {
i.Publish(event.Gid, Message{
TriggeredBy: event.Gid,
Event: strconv.FormatInt(int64(status), 10),
Content: events,
})
}
}
// OnDownloadStart 下载开始
func (i *inMemoryMQ) OnDownloadStart(events []rpc.Event) {
i.Aria2Notify(events, common.Downloading)
}
// OnDownloadPause 下载暂停
func (i *inMemoryMQ) OnDownloadPause(events []rpc.Event) {
i.Aria2Notify(events, common.Paused)
}
// OnDownloadStop 下载停止
func (i *inMemoryMQ) OnDownloadStop(events []rpc.Event) {
i.Aria2Notify(events, common.Canceled)
}
// OnDownloadComplete 下载完成
func (i *inMemoryMQ) OnDownloadComplete(events []rpc.Event) {
i.Aria2Notify(events, common.Complete)
}
// OnDownloadError 下载出错
func (i *inMemoryMQ) OnDownloadError(events []rpc.Event) {
i.Aria2Notify(events, common.Error)
}
// OnBtDownloadComplete BT下载完成
func (i *inMemoryMQ) OnBtDownloadComplete(events []rpc.Event) {
i.Aria2Notify(events, common.Complete)
}

149
pkg/mq/mq_test.go Normal file
View File

@@ -0,0 +1,149 @@
package mq
import (
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
)
func TestPublishAndSubscribe(t *testing.T) {
t.Parallel()
asserts := assert.New(t)
mq := NewMQ()
// No subscriber
{
asserts.NotPanics(func() {
mq.Publish("No subscriber", Message{})
})
}
// One channel subscriber
{
topic := "One channel subscriber"
msg := Message{TriggeredBy: "Tester"}
notifier := mq.Subscribe(topic, 0)
mq.Publish(topic, msg)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
msgRecv := <-notifier
asserts.Equal(msg, msgRecv)
}()
wg.Wait()
}
// two channel subscriber
{
topic := "two channel subscriber"
msg := Message{TriggeredBy: "Tester"}
notifier := mq.Subscribe(topic, 0)
notifier2 := mq.Subscribe(topic, 0)
mq.Publish(topic, msg)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
wg.Done()
msgRecv := <-notifier
asserts.Equal(msg, msgRecv)
}()
go func() {
wg.Done()
msgRecv := <-notifier2
asserts.Equal(msg, msgRecv)
}()
wg.Wait()
}
// two channel subscriber, one timeout
{
topic := "two channel subscriber, one timeout"
msg := Message{TriggeredBy: "Tester"}
mq.Subscribe(topic, 0)
notifier2 := mq.Subscribe(topic, 0)
mq.Publish(topic, msg)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
msgRecv := <-notifier2
asserts.Equal(msg, msgRecv)
}()
wg.Wait()
}
// two channel subscriber, one unsubscribe
{
topic := "two channel subscriber, one unsubscribe"
msg := Message{TriggeredBy: "Tester"}
mq.Subscribe(topic, 0)
notifier2 := mq.Subscribe(topic, 0)
notifier := mq.Subscribe(topic, 0)
mq.Unsubscribe(topic, notifier)
mq.Publish(topic, msg)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
msgRecv := <-notifier2
asserts.Equal(msg, msgRecv)
}()
wg.Wait()
select {
case <-notifier:
t.Error()
default:
}
}
}
func TestAria2Interface(t *testing.T) {
t.Parallel()
asserts := assert.New(t)
mq := NewMQ()
var (
OnDownloadStart int
OnDownloadPause int
OnDownloadStop int
OnDownloadComplete int
OnDownloadError int
)
l := sync.Mutex{}
mq.SubscribeCallback("TestAria2Interface", func(message Message) {
asserts.Equal("TestAria2Interface", message.TriggeredBy)
l.Lock()
defer l.Unlock()
switch message.Event {
case "1":
OnDownloadStart++
case "2":
OnDownloadPause++
case "5":
OnDownloadStop++
case "4":
OnDownloadComplete++
case "3":
OnDownloadError++
}
})
mq.OnDownloadStart([]rpc.Event{{"TestAria2Interface"}, {"TestAria2Interface"}})
mq.OnDownloadPause([]rpc.Event{{"TestAria2Interface"}, {"TestAria2Interface"}})
mq.OnDownloadStop([]rpc.Event{{"TestAria2Interface"}, {"TestAria2Interface"}})
mq.OnDownloadComplete([]rpc.Event{{"TestAria2Interface"}, {"TestAria2Interface"}})
mq.OnDownloadError([]rpc.Event{{"TestAria2Interface"}, {"TestAria2Interface"}})
mq.OnBtDownloadComplete([]rpc.Event{{"TestAria2Interface"}, {"TestAria2Interface"}})
time.Sleep(time.Duration(500) * time.Millisecond)
asserts.Equal(2, OnDownloadStart)
asserts.Equal(2, OnDownloadPause)
asserts.Equal(2, OnDownloadStop)
asserts.Equal(4, OnDownloadComplete)
asserts.Equal(2, OnDownloadError)
}

110
pkg/request/options.go Normal file
View File

@@ -0,0 +1,110 @@
package request
import (
"context"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"net/http"
"net/url"
"time"
)
// Option 发送请求的额外设置
type Option interface {
apply(*options)
}
type options struct {
timeout time.Duration
header http.Header
sign auth.Auth
signTTL int64
ctx context.Context
contentLength int64
masterMeta bool
endpoint *url.URL
slaveNodeID string
}
type optionFunc func(*options)
func (f optionFunc) apply(o *options) {
f(o)
}
func newDefaultOption() *options {
return &options{
header: http.Header{},
timeout: time.Duration(30) * time.Second,
contentLength: -1,
}
}
// WithTimeout 设置请求超时
func WithTimeout(t time.Duration) Option {
return optionFunc(func(o *options) {
o.timeout = t
})
}
// WithContext 设置请求上下文
func WithContext(c context.Context) Option {
return optionFunc(func(o *options) {
o.ctx = c
})
}
// WithCredential 对请求进行签名
func WithCredential(instance auth.Auth, ttl int64) Option {
return optionFunc(func(o *options) {
o.sign = instance
o.signTTL = ttl
})
}
// WithHeader 设置请求Header
func WithHeader(header http.Header) Option {
return optionFunc(func(o *options) {
for k, v := range header {
o.header[k] = v
}
})
}
// WithoutHeader 设置清除请求Header
func WithoutHeader(header []string) Option {
return optionFunc(func(o *options) {
for _, v := range header {
delete(o.header, v)
}
})
}
// WithContentLength 设置请求大小
func WithContentLength(s int64) Option {
return optionFunc(func(o *options) {
o.contentLength = s
})
}
// WithMasterMeta 请求时携带主机信息
func WithMasterMeta() Option {
return optionFunc(func(o *options) {
o.masterMeta = true
})
}
// WithSlaveMeta 请求时携带从机信息
func WithSlaveMeta(s string) Option {
return optionFunc(func(o *options) {
o.slaveNodeID = s
})
}
// Endpoint 使用同一的请求Endpoint
func WithEndpoint(endpoint string) Option {
endpointURL, _ := url.Parse(endpoint)
return optionFunc(func(o *options) {
o.endpoint = endpointURL
})
}

View File

@@ -1,23 +1,25 @@
package request
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"path"
"strings"
"sync"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
// GeneralClient 通用 HTTP Client
var GeneralClient Client = HTTPClient{}
var GeneralClient Client = NewClient()
// Response 请求的响应或错误信息
type Response struct {
@@ -32,90 +34,30 @@ type Client interface {
// HTTPClient 实现 Client 接口
type HTTPClient struct {
mu sync.Mutex
options *options
}
// Option 发送请求的额外设置
type Option interface {
apply(*options)
}
type options struct {
timeout time.Duration
header http.Header
sign auth.Auth
signTTL int64
ctx context.Context
contentLength int64
}
type optionFunc func(*options)
func (f optionFunc) apply(o *options) {
f(o)
}
func newDefaultOption() *options {
return &options{
header: http.Header{},
timeout: time.Duration(30) * time.Second,
contentLength: -1,
func NewClient(opts ...Option) Client {
client := &HTTPClient{
options: newDefaultOption(),
}
}
// WithTimeout 设置请求超时
func WithTimeout(t time.Duration) Option {
return optionFunc(func(o *options) {
o.timeout = t
})
}
for _, o := range opts {
o.apply(client.options)
}
// WithContext 设置请求上下文
func WithContext(c context.Context) Option {
return optionFunc(func(o *options) {
o.ctx = c
})
}
// WithCredential 对请求进行签名
func WithCredential(instance auth.Auth, ttl int64) Option {
return optionFunc(func(o *options) {
o.sign = instance
o.signTTL = ttl
})
}
// WithHeader 设置请求Header
func WithHeader(header http.Header) Option {
return optionFunc(func(o *options) {
for k, v := range header {
o.header[k] = v
}
})
}
// WithoutHeader 设置清除请求Header
func WithoutHeader(header []string) Option {
return optionFunc(func(o *options) {
for _, v := range header {
delete(o.header, v)
}
})
}
// WithContentLength 设置请求大小
func WithContentLength(s int64) Option {
return optionFunc(func(o *options) {
o.contentLength = s
})
return client
}
// Request 发送HTTP请求
func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Option) *Response {
// 应用额外设置
options := newDefaultOption()
c.mu.Lock()
options := *c.options
c.mu.Unlock()
for _, o := range opts {
o.apply(options)
o.apply(&options)
}
// 创建请求客户端
@@ -126,6 +68,13 @@ func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Optio
body = nil
}
// 确定请求URL
if options.endpoint != nil {
targetURL := *options.endpoint
targetURL.Path = path.Join(targetURL.Path, target)
target = targetURL.String()
}
// 创建请求
var (
req *http.Request
@@ -141,14 +90,36 @@ func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Optio
}
// 添加请求相关设置
req.Header = options.header
if options.header != nil {
for k, v := range options.header {
req.Header.Add(k, strings.Join(v, " "))
}
}
if options.masterMeta && conf.SystemConfig.Mode == "master" {
req.Header.Add("X-Site-Url", model.GetSiteURL().String())
req.Header.Add("X-Site-Id", model.GetSettingByName("siteID"))
req.Header.Add("X-Cloudreve-Version", conf.BackendVersion)
}
if options.slaveNodeID != "" && conf.SystemConfig.Mode == "slave" {
req.Header.Add("X-Node-Id", options.slaveNodeID)
}
if options.contentLength != -1 {
req.ContentLength = options.contentLength
}
// 签名请求
if options.sign != nil {
auth.SignRequest(options.sign, req, options.signTTL)
switch method {
case "PUT", "POST", "PATCH":
auth.SignRequest(options.sign, req, options.signTTL)
default:
if resURL, err := auth.SignURI(options.sign, req.URL.String(), options.signTTL); err == nil {
req.URL = resURL
}
}
}
// 发送请求

View File

@@ -11,6 +11,7 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
)
// TODO: move to slave pkg
// RemoteCallback 发送远程存储策略上传回调请求
func RemoteCallback(url string, body serializer.UploadCallback) error {
callbackBody, err := json.Marshal(struct {

View File

@@ -5,16 +5,15 @@ import "encoding/json"
// RequestRawSign 待签名的HTTP请求
type RequestRawSign struct {
Path string
Policy string
Header string
Body string
}
// NewRequestSignString 返回JSON格式的待签名字符串
// TODO 测试
func NewRequestSignString(path, policy, body string) string {
func NewRequestSignString(path, header, body string) string {
req := RequestRawSign{
Path: path,
Policy: policy,
Header: header,
Body: body,
}
res, _ := json.Marshal(req)

View File

@@ -1,14 +1,9 @@
package serializer
import "github.com/gin-gonic/gin"
// Response 基础序列化器
type Response struct {
Code int `json:"code"`
Data interface{} `json:"data,omitempty"`
Msg string `json:"msg"`
Error string `json:"error,omitempty"`
}
import (
"errors"
"github.com/gin-gonic/gin"
)
// AppError 应用错误实现了error接口
type AppError struct {
@@ -17,7 +12,7 @@ type AppError struct {
RawError error
}
// NewError 返回新的错误对象 todo:测试 还有下面的
// NewError 返回新的错误对象
func NewError(code int, msg string, err error) AppError {
return AppError{
Code: code,
@@ -26,6 +21,15 @@ func NewError(code int, msg string, err error) AppError {
}
}
// NewErrorFromResponse 从 serializer.Response 构建错误
func NewErrorFromResponse(resp *Response) AppError {
return AppError{
Code: resp.Code,
Msg: resp.Msg,
RawError: errors.New(resp.Error),
}
}
// WithError 将应用error携带标准库中的error
func (err *AppError) WithError(raw error) AppError {
err.RawError = raw
@@ -66,6 +70,8 @@ const (
CodeGroupNotAllowed = 40007
// CodeAdminRequired 非管理用户组
CodeAdminRequired = 40008
// CodeMasterNotFound 主机节点未注册
CodeMasterNotFound = 40009
// CodeDBError 数据库操作失败
CodeDBError = 50001
// CodeEncryptError 加密失败

View File

@@ -0,0 +1,35 @@
package serializer
import (
"bytes"
"encoding/base64"
"encoding/gob"
)
// Response 基础序列化器
type Response struct {
Code int `json:"code"`
Data interface{} `json:"data,omitempty"`
Msg string `json:"msg"`
Error string `json:"error,omitempty"`
}
// NewResponseWithGobData 返回Data字段使用gob编码的Response
func NewResponseWithGobData(data interface{}) Response {
var w bytes.Buffer
encoder := gob.NewEncoder(&w)
if err := encoder.Encode(data); err != nil {
return Err(CodeInternalSetting, "无法编码返回结果", err)
}
return Response{Data: w.Bytes()}
}
// GobDecode 将 Response 正文解码至目标指针
func (r *Response) GobDecode(target interface{}) {
src := r.Data.(string)
raw := make([]byte, len(src)*len(src)/base64.StdEncoding.DecodedLen(len(src)))
base64.StdEncoding.Decode(raw, []byte(src))
decoder := gob.NewDecoder(bytes.NewBuffer(raw))
decoder.Decode(target)
}

View File

@@ -1,5 +1,12 @@
package serializer
import (
"crypto/sha1"
"encoding/gob"
"fmt"
model "github.com/cloudreve/Cloudreve/v3/models"
)
// RemoteDeleteRequest 远程策略删除接口请求正文
type RemoteDeleteRequest struct {
Files []string `json:"files"`
@@ -10,3 +17,51 @@ type ListRequest struct {
Path string `json:"path"`
Recursive bool `json:"recursive"`
}
// NodePingReq 从机节点Ping请求
type NodePingReq struct {
SiteURL string `json:"site_url"`
SiteID string `json:"site_id"`
IsUpdate bool `json:"is_update"`
CredentialTTL int `json:"credential_ttl"`
Node *model.Node `json:"node"`
}
// NodePingResp 从机节点Ping响应
type NodePingResp struct {
}
// SlaveAria2Call 从机有关Aria2的请求正文
type SlaveAria2Call struct {
Task *model.Download `json:"task"`
GroupOptions map[string]interface{} `json:"group_options"`
Files []int `json:"files"`
}
// SlaveTransferReq 从机中转任务创建请求
type SlaveTransferReq struct {
Src string `json:"src"`
Dst string `json:"dst"`
Policy *model.Policy `json:"policy"`
}
// Hash 返回创建请求的唯一标识,保持创建请求幂等
func (s *SlaveTransferReq) Hash(id string) string {
h := sha1.New()
h.Write([]byte(fmt.Sprintf("transfer-%s-%s-%s-%d", id, s.Src, s.Dst, s.Policy.ID)))
bs := h.Sum(nil)
return fmt.Sprintf("%x", bs)
}
const (
SlaveTransferSuccess = "success"
SlaveTransferFailed = "failed"
)
type SlaveTransferResult struct {
Error string
}
func init() {
gob.Register(SlaveTransferResult{})
}

7
pkg/slave/errors.go Normal file
View File

@@ -0,0 +1,7 @@
package slave
import "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
var (
ErrMasterNotFound = serializer.NewError(serializer.CodeMasterNotFound, "未知的主机节点", nil)
)

209
pkg/slave/slave.go Normal file
View File

@@ -0,0 +1,209 @@
package slave
import (
"bytes"
"encoding/gob"
"fmt"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/jinzhu/gorm"
"net/url"
"sync"
)
var DefaultController Controller
// Controller controls communications between master and slave
type Controller interface {
// Handle heartbeat sent from master
HandleHeartBeat(*serializer.NodePingReq) (serializer.NodePingResp, error)
// Get Aria2 Instance by master node ID
GetAria2Instance(string) (common.Aria2, error)
// Send event change message to master node
SendNotification(string, string, mq.Message) error
// Submit async task into task pool
SubmitTask(string, interface{}, string, func(interface{})) error
// Get master node info
GetMasterInfo(string) (*MasterInfo, error)
// Get master OneDrive policy credential
GetOneDriveToken(string, uint) (string, error)
}
type slaveController struct {
masters map[string]MasterInfo
lock sync.RWMutex
}
// info of master node
type MasterInfo struct {
ID string
TTL int
URL *url.URL
// used to invoke aria2 rpc calls
Instance cluster.Node
Client request.Client
jobTracker map[string]bool
}
func Init() {
DefaultController = &slaveController{
masters: make(map[string]MasterInfo),
}
gob.Register(rpc.StatusInfo{})
}
func (c *slaveController) HandleHeartBeat(req *serializer.NodePingReq) (serializer.NodePingResp, error) {
c.lock.Lock()
defer c.lock.Unlock()
req.Node.AfterFind()
// close old node if exist
origin, ok := c.masters[req.SiteID]
if (ok && req.IsUpdate) || !ok {
if ok {
origin.Instance.Kill()
}
masterUrl, err := url.Parse(req.SiteURL)
if err != nil {
return serializer.NodePingResp{}, err
}
c.masters[req.SiteID] = MasterInfo{
ID: req.SiteID,
URL: masterUrl,
TTL: req.CredentialTTL,
Client: request.NewClient(
request.WithEndpoint(masterUrl.String()),
request.WithSlaveMeta(fmt.Sprintf("%d", req.Node.ID)),
request.WithCredential(auth.HMACAuth{
SecretKey: []byte(req.Node.MasterKey),
}, int64(req.CredentialTTL)),
),
jobTracker: make(map[string]bool),
Instance: cluster.NewNodeFromDBModel(&model.Node{
Model: gorm.Model{ID: req.Node.ID},
MasterKey: req.Node.MasterKey,
Type: model.MasterNodeType,
Aria2Enabled: req.Node.Aria2Enabled,
Aria2OptionsSerialized: req.Node.Aria2OptionsSerialized,
}),
}
}
return serializer.NodePingResp{}, nil
}
func (c *slaveController) GetAria2Instance(id string) (common.Aria2, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if node, ok := c.masters[id]; ok {
return node.Instance.GetAria2Instance(), nil
}
return nil, ErrMasterNotFound
}
func (c *slaveController) SendNotification(id, subject string, msg mq.Message) error {
c.lock.RLock()
if node, ok := c.masters[id]; ok {
c.lock.RUnlock()
body := bytes.Buffer{}
enc := gob.NewEncoder(&body)
if err := enc.Encode(&msg); err != nil {
return err
}
res, err := node.Client.Request(
"PUT",
fmt.Sprintf("/api/v3/slave/notification/%s", subject),
&body,
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
return nil
}
c.lock.RUnlock()
return ErrMasterNotFound
}
// SubmitTask 提交异步任务
func (c *slaveController) SubmitTask(id string, job interface{}, hash string, submitter func(interface{})) error {
c.lock.RLock()
defer c.lock.RUnlock()
if node, ok := c.masters[id]; ok {
if _, ok := node.jobTracker[hash]; ok {
// 任务已存在,直接返回
return nil
}
submitter(job)
return nil
}
return ErrMasterNotFound
}
// GetMasterInfo 获取主机节点信息
func (c *slaveController) GetMasterInfo(id string) (*MasterInfo, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if node, ok := c.masters[id]; ok {
return &node, nil
}
return nil, ErrMasterNotFound
}
// GetOneDriveToken 获取主机OneDrive凭证
func (c *slaveController) GetOneDriveToken(id string, policyID uint) (string, error) {
c.lock.RLock()
if node, ok := c.masters[id]; ok {
c.lock.RUnlock()
res, err := node.Client.Request(
"GET",
fmt.Sprintf("/api/v3/slave/credential/onedrive/%d", policyID),
nil,
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return "", err
}
if res.Code != 0 {
return "", serializer.NewErrorFromResponse(res)
}
return res.Data.(string), nil
}
c.lock.RUnlock()
return "", ErrMasterNotFound
}

View File

@@ -106,7 +106,7 @@ func (job *CompressTask) Do() {
job.TaskModel.SetProgress(TransferringProgress)
// 上传文件
err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst)
err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst, true)
if err != nil {
job.SetErrorMsg(err.Error())
return

View File

@@ -96,7 +96,9 @@ func Resume() {
continue
}
TaskPoll.Submit(job)
if job != nil {
TaskPoll.Submit(job)
}
}
}

View File

@@ -2,6 +2,7 @@ package task
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
@@ -56,5 +57,7 @@ func Init() {
TaskPoll.Add(maxWorker)
util.Log().Info("初始化任务队列WorkerNum = %d", maxWorker)
Resume()
if conf.SystemConfig.Mode == "master" {
Resume()
}
}

View File

@@ -0,0 +1,145 @@
package slavetask
import (
"context"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/slave"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"os"
"path/filepath"
)
// TransferTask 文件中转任务
type TransferTask struct {
Err *task.JobError
Req *serializer.SlaveTransferReq
MasterID string
}
// Props 获取任务属性
func (job *TransferTask) Props() string {
return ""
}
// Type 获取任务类型
func (job *TransferTask) Type() int {
return 0
}
// Creator 获取创建者ID
func (job *TransferTask) Creator() uint {
return 0
}
// Model 获取任务的数据库模型
func (job *TransferTask) Model() *model.Task {
return nil
}
// SetStatus 设定状态
func (job *TransferTask) SetStatus(status int) {
}
// SetError 设定任务失败信息
func (job *TransferTask) SetError(err *task.JobError) {
job.Err = err
}
// SetErrorMsg 设定任务失败信息
func (job *TransferTask) SetErrorMsg(msg string, err error) {
jobErr := &task.JobError{Msg: msg}
if err != nil {
jobErr.Error = err.Error()
}
job.SetError(jobErr)
notifyMsg := mq.Message{
TriggeredBy: job.MasterID,
Event: serializer.SlaveTransferFailed,
Content: serializer.SlaveTransferResult{
Error: err.Error(),
},
}
if err := slave.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), notifyMsg); err != nil {
util.Log().Warning("无法发送转存失败通知到从机, ", err)
}
}
// GetError 返回任务失败信息
func (job *TransferTask) GetError() *task.JobError {
return job.Err
}
// Do 开始执行任务
func (job *TransferTask) Do() {
defer job.Recycle()
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
job.SetErrorMsg("无法初始化匿名文件系统", err)
return
}
fs.Policy = job.Req.Policy
if err := fs.DispatchHandler(); err != nil {
job.SetErrorMsg("无法分发存储策略", err)
return
}
master, err := slave.DefaultController.GetMasterInfo(job.MasterID)
if err != nil {
job.SetErrorMsg("找不到主机节点", err)
return
}
fs.SwitchToShadowHandler(master.Instance, master.URL.String(), master.ID)
ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true)
file, err := os.Open(util.RelativePath(job.Req.Src))
if err != nil {
job.SetErrorMsg("无法读取源文件", err)
return
}
defer file.Close()
// 获取源文件大小
fi, err := file.Stat()
if err != nil {
job.SetErrorMsg("无法获取源文件大小", err)
return
}
size := fi.Size()
err = fs.Handler.Put(ctx, file, job.Req.Dst, uint64(size))
if err != nil {
job.SetErrorMsg("文件上传失败", err)
return
}
msg := mq.Message{
TriggeredBy: job.MasterID,
Event: serializer.SlaveTransferSuccess,
Content: serializer.SlaveTransferResult{},
}
if err := slave.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), msg); err != nil {
util.Log().Warning("无法发送转存成功通知到从机, ", err)
}
}
// Recycle 回收临时文件
func (job *TransferTask) Recycle() {
err := os.RemoveAll(filepath.Dir(job.Req.Src))
if err != nil {
util.Log().Warning("无法删除中转临时目录[%s], %s", job.Req.Src, err)
}
}

View File

@@ -9,6 +9,7 @@ import (
"strings"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
@@ -26,11 +27,14 @@ type TransferTask struct {
// TransferProps 中转任务属性
type TransferProps struct {
Src []string `json:"src"` // 原始文件
Parent string `json:"parent"` // 父目录
Dst string `json:"dst"` // 目的目录ID
Src []string `json:"src"` // 原始文件
SrcSizes map[string]uint64 `json:"src_size"` // 原始文件的大小信息,从机转存时使用
Parent string `json:"parent"` // 父目录
Dst string `json:"dst"` // 目的目录ID
// 将会保留原始文件的目录结构Src 除去 Parent 开头作为最终路径
TrimPath bool `json:"trim_path"`
// 负责处理中专任务的节点ID
NodeID uint `json:"node_id"`
}
// Props 获取任务属性
@@ -104,7 +108,24 @@ func (job *TransferTask) Do() {
}
ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true)
err = fs.UploadFromPath(ctx, file, dst)
ctx = context.WithValue(ctx, fsctx.SlaveSrcPath, file)
if job.TaskProps.NodeID > 1 {
// 指定为从机中转
// 获取从机节点
node := cluster.Default.GetNodeByID(job.TaskProps.NodeID)
if node == nil {
job.SetErrorMsg("从机节点不可用", nil)
}
// 切换为从机节点处理上传
fs.SwitchToSlaveHandler(node)
err = fs.UploadFromStream(ctx, nil, dst, job.TaskProps.SrcSizes[file])
} else {
// 主机节点中转
err = fs.UploadFromPath(ctx, file, dst, true)
}
if err != nil {
job.SetErrorMsg("文件转存失败", err)
}
@@ -114,15 +135,16 @@ func (job *TransferTask) Do() {
// Recycle 回收临时文件
func (job *TransferTask) Recycle() {
err := os.RemoveAll(job.TaskProps.Parent)
if err != nil {
util.Log().Warning("无法删除中转临时目录[%s], %s", job.TaskProps.Parent, err)
if job.TaskProps.NodeID == 1 {
err := os.RemoveAll(job.TaskProps.Parent)
if err != nil {
util.Log().Warning("无法删除中转临时目录[%s], %s", job.TaskProps.Parent, err)
}
}
}
// NewTransferTask 新建中转任务
func NewTransferTask(user uint, src []string, dst, parent string, trim bool) (Job, error) {
func NewTransferTask(user uint, src []string, dst, parent string, trim bool, node uint, sizes map[string]uint64) (Job, error) {
creator, err := model.GetActiveUserByID(user)
if err != nil {
return nil, err
@@ -135,6 +157,8 @@ func NewTransferTask(user uint, src []string, dst, parent string, trim bool) (Jo
Parent: parent,
Dst: dst,
TrimPath: trim,
NodeID: node,
SrcSizes: sizes,
},
}