mirror of
https://github.com/halejohn/Cloudreve.git
synced 2026-01-26 09:34:57 +08:00
Feat: handle aria2 download complete
This commit is contained in:
194
pkg/aria2/Monitor.go
Normal file
194
pkg/aria2/Monitor.go
Normal file
@@ -0,0 +1,194 @@
|
||||
package aria2
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
model "github.com/HFO4/cloudreve/models"
|
||||
"github.com/HFO4/cloudreve/pkg/task"
|
||||
"github.com/HFO4/cloudreve/pkg/util"
|
||||
"github.com/zyxar/argo/rpc"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Monitor 离线下载状态监控
|
||||
type Monitor struct {
|
||||
Task *model.Download
|
||||
Interval time.Duration
|
||||
|
||||
notifier chan StatusEvent
|
||||
}
|
||||
|
||||
// StatusEvent 状态改变事件
|
||||
type StatusEvent struct {
|
||||
GID string
|
||||
Status int
|
||||
}
|
||||
|
||||
// NewMonitor 新建上传状态监控
|
||||
func NewMonitor(task *model.Download) {
|
||||
monitor := &Monitor{
|
||||
Task: task,
|
||||
Interval: time.Duration(model.GetIntSetting("aria2_interval", 10)) * time.Second,
|
||||
notifier: make(chan StatusEvent),
|
||||
}
|
||||
go monitor.Loop()
|
||||
EventNotifier.Subscribe(monitor.notifier, monitor.Task.GID)
|
||||
}
|
||||
|
||||
// Loop 开启监控循环
|
||||
func (monitor *Monitor) Loop() {
|
||||
defer EventNotifier.Unsubscribe(monitor.Task.GID)
|
||||
|
||||
// 首次循环立即更新
|
||||
interval := time.Duration(0)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-monitor.notifier:
|
||||
if monitor.Update() {
|
||||
return
|
||||
}
|
||||
case <-time.After(interval):
|
||||
interval = monitor.Interval
|
||||
if monitor.Update() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update 更新状态,返回值表示是否退出监控
|
||||
func (monitor *Monitor) Update() bool {
|
||||
status, err := Instance.Status(monitor.Task)
|
||||
if err != nil {
|
||||
util.Log().Warning("无法获取下载任务[%s]的状态,%s", monitor.Task.GID, err)
|
||||
monitor.setErrorStatus(err)
|
||||
monitor.RemoveTempFolder()
|
||||
return true
|
||||
}
|
||||
|
||||
// 更新任务信息
|
||||
if err := monitor.UpdateTaskInfo(status); err != nil {
|
||||
util.Log().Warning("无法更新下载任务[%s]的任务信息[%s],", monitor.Task.GID, err)
|
||||
return true
|
||||
}
|
||||
|
||||
util.Log().Debug(status.Status)
|
||||
|
||||
switch status.Status {
|
||||
case "complete":
|
||||
return monitor.Complete(status)
|
||||
case "error":
|
||||
return monitor.Error(status)
|
||||
case "active", "waiting", "paused":
|
||||
return false
|
||||
case "removed":
|
||||
return true
|
||||
default:
|
||||
util.Log().Warning("下载任务[%s]返回未知状态信息[%s],", monitor.Task.GID, status.Status)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateTaskInfo 更新数据库中的任务信息
|
||||
func (monitor *Monitor) UpdateTaskInfo(status rpc.StatusInfo) error {
|
||||
monitor.Task.GID = status.Gid
|
||||
monitor.Task.Status = getStatus(status.Status)
|
||||
|
||||
// 文件大小、已下载大小
|
||||
total, err := strconv.ParseUint(status.TotalLength, 10, 64)
|
||||
if err != nil {
|
||||
total = 0
|
||||
}
|
||||
downloaded, err := strconv.ParseUint(status.CompletedLength, 10, 64)
|
||||
if err != nil {
|
||||
downloaded = 0
|
||||
}
|
||||
monitor.Task.TotalSize = total
|
||||
monitor.Task.DownloadedSize = downloaded
|
||||
monitor.Task.GID = status.Gid
|
||||
monitor.Task.Parent = status.Dir
|
||||
|
||||
// 下载速度
|
||||
speed, err := strconv.Atoi(status.DownloadSpeed)
|
||||
if err != nil {
|
||||
speed = 0
|
||||
}
|
||||
|
||||
monitor.Task.Speed = speed
|
||||
if len(status.Files) > 0 {
|
||||
monitor.Task.Path = status.Files[0].Path
|
||||
}
|
||||
attrs, _ := json.Marshal(status)
|
||||
monitor.Task.Attrs = string(attrs)
|
||||
|
||||
return monitor.Task.Save()
|
||||
}
|
||||
|
||||
// Error 任务下载出错处理,返回是否中断监控
|
||||
func (monitor *Monitor) Error(status rpc.StatusInfo) bool {
|
||||
monitor.setErrorStatus(errors.New(status.ErrorMessage))
|
||||
|
||||
// 清理临时文件
|
||||
monitor.RemoveTempFolder()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// RemoveTempFile 清理下载临时文件
|
||||
func (monitor *Monitor) RemoveTempFile() {
|
||||
err := os.Remove(monitor.Task.Path)
|
||||
if err != nil {
|
||||
util.Log().Warning("无法删除离线下载临时文件[%s], %s", monitor.Task.Path, err)
|
||||
}
|
||||
|
||||
if empty, _ := util.IsEmpty(monitor.Task.Parent); empty {
|
||||
err := os.Remove(monitor.Task.Parent)
|
||||
if err != nil {
|
||||
util.Log().Warning("无法删除离线下载临时目录[%s], %s", monitor.Task.Parent, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveTempFolder 清理下载临时目录
|
||||
func (monitor *Monitor) RemoveTempFolder() {
|
||||
err := os.RemoveAll(monitor.Task.Parent)
|
||||
if err != nil {
|
||||
util.Log().Warning("无法删除离线下载临时目录[%s], %s", monitor.Task.Parent, err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Complete 完成下载,返回是否中断监控
|
||||
func (monitor *Monitor) Complete(status rpc.StatusInfo) bool {
|
||||
// 创建中转任务
|
||||
job, err := task.NewTransferTask(
|
||||
monitor.Task.UserID,
|
||||
path.Join(monitor.Task.Dst, filepath.Base(monitor.Task.Path)),
|
||||
monitor.Task.Path,
|
||||
monitor.Task.Parent,
|
||||
)
|
||||
if err != nil {
|
||||
monitor.setErrorStatus(err)
|
||||
return true
|
||||
}
|
||||
|
||||
// 提交中转任务
|
||||
task.TaskPoll.Submit(job)
|
||||
|
||||
// 更新任务ID
|
||||
monitor.Task.TaskID = job.Model().ID
|
||||
monitor.Task.Save()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (monitor *Monitor) setErrorStatus(err error) {
|
||||
monitor.Task.Status = Error
|
||||
monitor.Task.Error = err.Error()
|
||||
monitor.Task.Save()
|
||||
}
|
||||
@@ -4,16 +4,22 @@ import (
|
||||
model "github.com/HFO4/cloudreve/models"
|
||||
"github.com/HFO4/cloudreve/pkg/serializer"
|
||||
"github.com/HFO4/cloudreve/pkg/util"
|
||||
"github.com/zyxar/argo/rpc"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// Instance 默认使用的Aria2处理实例
|
||||
var Instance Aria2 = &DummyAria2{}
|
||||
|
||||
// EventNotifier 任务状态更新通知处理器
|
||||
var EventNotifier = &Notifier{}
|
||||
|
||||
// Aria2 离线下载处理接口
|
||||
type Aria2 interface {
|
||||
// CreateTask 创建新的任务
|
||||
CreateTask(task *model.Download) error
|
||||
// 返回状态信息
|
||||
Status(task *model.Download) (rpc.StatusInfo, error)
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -26,6 +32,18 @@ const (
|
||||
const (
|
||||
// Ready 准备就绪
|
||||
Ready = iota
|
||||
// Downloading 下载中
|
||||
Downloading
|
||||
// Paused 暂停中
|
||||
Paused
|
||||
// Error 出错
|
||||
Error
|
||||
// Complete 完成
|
||||
Complete
|
||||
// Canceled 取消/停止
|
||||
Canceled
|
||||
// Unknown 未知状态
|
||||
Unknown
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -42,6 +60,11 @@ func (instance *DummyAria2) CreateTask(task *model.Download) error {
|
||||
return ErrNotEnabled
|
||||
}
|
||||
|
||||
// Status 返回未开启错误
|
||||
func (instance *DummyAria2) Status(task *model.Download) (rpc.StatusInfo, error) {
|
||||
return rpc.StatusInfo{}, ErrNotEnabled
|
||||
}
|
||||
|
||||
// Init 初始化
|
||||
func Init() {
|
||||
options := model.GetSettingByNames("aria2_rpcurl", "aria2_token", "aria2_options")
|
||||
@@ -72,4 +95,31 @@ func Init() {
|
||||
}
|
||||
|
||||
Instance = client
|
||||
|
||||
// 从数据库中读取未完成任务,创建监控
|
||||
unfinished := model.GetDownloadsByStatus(Ready, Paused, Downloading)
|
||||
for _, task := range unfinished {
|
||||
// 创建任务监控
|
||||
NewMonitor(&task)
|
||||
}
|
||||
}
|
||||
|
||||
// getStatus 将给定的状态字符串转换为状态标识数字
|
||||
func getStatus(status string) int {
|
||||
switch status {
|
||||
case "complete":
|
||||
return Complete
|
||||
case "active":
|
||||
return Downloading
|
||||
case "waiting":
|
||||
return Ready
|
||||
case "paused":
|
||||
return Paused
|
||||
case "error":
|
||||
return Error
|
||||
case "removed":
|
||||
return Canceled
|
||||
default:
|
||||
return Unknown
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,11 +30,16 @@ func (client *RPCService) Init(server, secret string, timeout int, options []int
|
||||
Options: options,
|
||||
}
|
||||
caller, err := rpc.New(context.Background(), server, secret, time.Duration(timeout)*time.Second,
|
||||
rpc.DummyNotifier{})
|
||||
EventNotifier)
|
||||
client.caller = caller
|
||||
return err
|
||||
}
|
||||
|
||||
// Status 查询下载状态
|
||||
func (client *RPCService) Status(task *model.Download) (rpc.StatusInfo, error) {
|
||||
return client.caller.TellStatus(task.GID)
|
||||
}
|
||||
|
||||
// CreateTask 创建新任务
|
||||
func (client *RPCService) CreateTask(task *model.Download) error {
|
||||
// 生成存储路径
|
||||
@@ -45,7 +50,11 @@ func (client *RPCService) CreateTask(task *model.Download) error {
|
||||
)
|
||||
|
||||
// 创建下载任务
|
||||
gid, err := client.caller.AddURI(task.Source, map[string]string{"dir": task.Path})
|
||||
options := []interface{}{map[string]string{"dir": task.Path}}
|
||||
if len(client.options.Options) > 0 {
|
||||
options = append(options, client.options.Options)
|
||||
}
|
||||
gid, err := client.caller.AddURI(task.Source, options...)
|
||||
if err != nil || gid == "" {
|
||||
return err
|
||||
}
|
||||
@@ -53,6 +62,12 @@ func (client *RPCService) CreateTask(task *model.Download) error {
|
||||
// 保存到数据库
|
||||
task.GID = gid
|
||||
_, err = task.Create()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
// 创建任务监控
|
||||
NewMonitor(task)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
63
pkg/aria2/notification.go
Normal file
63
pkg/aria2/notification.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package aria2
|
||||
|
||||
import (
|
||||
"github.com/zyxar/argo/rpc"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Notifier aria2实践通知处理
|
||||
type Notifier struct {
|
||||
Subscribes sync.Map
|
||||
}
|
||||
|
||||
// Subscribe 订阅事件通知
|
||||
func (notifier *Notifier) Subscribe(target chan StatusEvent, gid string) {
|
||||
notifier.Subscribes.Store(gid, target)
|
||||
}
|
||||
|
||||
// Unsubscribe 取消订阅事件通知
|
||||
func (notifier *Notifier) Unsubscribe(gid string) {
|
||||
notifier.Subscribes.Delete(gid)
|
||||
}
|
||||
|
||||
// Notify 发送通知
|
||||
func (notifier *Notifier) Notify(events []rpc.Event, status int) {
|
||||
for _, event := range events {
|
||||
if target, ok := notifier.Subscribes.Load(event.Gid); ok {
|
||||
target.(chan StatusEvent) <- StatusEvent{
|
||||
GID: event.Gid,
|
||||
Status: status,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// OnDownloadStart 下载开始
|
||||
func (notifier *Notifier) OnDownloadStart(events []rpc.Event) {
|
||||
notifier.Notify(events, Downloading)
|
||||
}
|
||||
|
||||
// OnDownloadPause 下载暂停
|
||||
func (notifier *Notifier) OnDownloadPause(events []rpc.Event) {
|
||||
notifier.Notify(events, Paused)
|
||||
}
|
||||
|
||||
// OnDownloadStop 下载停止
|
||||
func (notifier *Notifier) OnDownloadStop(events []rpc.Event) {
|
||||
notifier.Notify(events, Canceled)
|
||||
}
|
||||
|
||||
// OnDownloadComplete 下载完成
|
||||
func (notifier *Notifier) OnDownloadComplete(events []rpc.Event) {
|
||||
notifier.Notify(events, Complete)
|
||||
}
|
||||
|
||||
// OnDownloadError 下载出错
|
||||
func (notifier *Notifier) OnDownloadError(events []rpc.Event) {
|
||||
notifier.Notify(events, Error)
|
||||
}
|
||||
|
||||
// OnBtDownloadComplete BT下载完成
|
||||
func (notifier *Notifier) OnBtDownloadComplete(events []rpc.Event) {
|
||||
notifier.Notify(events, Complete)
|
||||
}
|
||||
Reference in New Issue
Block a user