Feat: cron / Fix: users status check

This commit is contained in:
HFO4
2020-02-15 14:02:21 +08:00
parent 44d6ca487c
commit faf46745bc
22 changed files with 503 additions and 28 deletions

14
pkg/cache/memo.go vendored
View File

@@ -1,6 +1,7 @@
package cache
import (
"github.com/HFO4/cloudreve/pkg/util"
"sync"
"time"
)
@@ -46,6 +47,19 @@ func getValue(item interface{}, ok bool) (interface{}, bool) {
}
// GarbageCollect 回收已过期的缓存
func (store *MemoStore) GarbageCollect() {
store.Store.Range(func(key, value interface{}) bool {
if item, ok := value.(itemWithTTL); ok {
if item.expires > 0 && item.expires < time.Now().Unix() {
util.Log().Debug("回收垃圾[%s]", key.(string))
store.Store.Delete(key)
}
}
return true
})
}
// NewMemoStore 新建内存存储
func NewMemoStore() *MemoStore {
return &MemoStore{

54
pkg/crontab/collect.go Normal file
View File

@@ -0,0 +1,54 @@
package crontab
import (
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/cache"
"github.com/HFO4/cloudreve/pkg/util"
"os"
"path/filepath"
"strings"
"time"
)
func garbageCollect() {
// 清理打包下载产生的临时文件
collectArchiveFile()
// 清理过期的内置内存缓存
if store, ok := cache.Store.(*cache.MemoStore); ok {
collectCache(store)
}
util.Log().Info("定时任务 [cron_garbage_collect] 执行完毕")
}
func collectArchiveFile() {
// 读取有效期、目录设置
tempPath := model.GetSettingByName("temp_path")
expires := model.GetIntSetting("download_timeout", 30)
// 列出文件
root := filepath.Join(tempPath, "archive")
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if !info.IsDir() &&
strings.HasPrefix(filepath.Base(path), "archive_") &&
time.Now().Sub(info.ModTime()).Seconds() > float64(expires) {
util.Log().Debug("删除过期打包下载临时文件 [%s]", path)
// 删除符合条件的文件
if err := os.Remove(path); err != nil {
util.Log().Debug("临时文件 [%s] 删除失败 , %s", path, err)
}
}
return nil
})
if err != nil {
util.Log().Debug("[定时任务] 无法列取临时打包目录")
}
}
func collectCache(store *cache.MemoStore) {
util.Log().Debug("清理内存缓存")
store.GarbageCollect()
}

47
pkg/crontab/init.go Normal file
View File

@@ -0,0 +1,47 @@
package crontab
import (
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/util"
"github.com/robfig/cron/v3"
)
// Cron 定时任务
var Cron *cron.Cron
// Reload 重新启动定时任务
func Reload() {
if Cron != nil {
Cron.Stop()
}
Init()
}
// Init 初始化定时任务
func Init() {
util.Log().Info("初始化定时任务...")
// 读取cron日程设置
options := model.GetSettingByNames("cron_garbage_collect", "cron_notify_user", "cron_ban_user")
Cron := cron.New()
for k, v := range options {
var handler func()
switch k {
case "cron_garbage_collect":
handler = garbageCollect
case "cron_notify_user":
handler = notifyExpiredVAS
case "cron_ban_user":
handler = banOverusedUser
default:
util.Log().Warning("未知定时任务类型 [%s],跳过", k)
continue
}
if _, err := Cron.AddFunc(v, handler); err != nil {
util.Log().Warning("无法启动定时任务 [%s] , %s", k, err)
}
}
banOverusedUser()
Cron.Start()
}

84
pkg/crontab/vas.go Normal file
View File

@@ -0,0 +1,84 @@
package crontab
import (
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/email"
"github.com/HFO4/cloudreve/pkg/util"
)
func notifyExpiredVAS() {
checkStoragePack()
checkUserGroup()
util.Log().Info("定时任务 [cron_notify_user] 执行完毕")
}
// banOverusedUser 封禁超出宽容期的用户
func banOverusedUser() {
users := model.GetTolerantExpiredUser()
for _, user := range users {
// 清除最后通知日期标记
user.ClearNotified()
// 检查容量是否超额
if user.Storage > user.Group.MaxStorage+user.GetAvailablePackSize() {
// 封禁用户
user.SetStatus(model.OveruseBaned)
}
}
}
// checkUserGroup 检查已过期用户组
func checkUserGroup() {
users := model.GetGroupExpiredUsers()
for _, user := range users {
// 将用户回退到初始用户组
user.GroupFallback()
// 重新加载用户
user, _ = model.GetUserByID(user.ID)
// 检查容量是否超额
if user.Storage > user.Group.MaxStorage+user.GetAvailablePackSize() {
// 如果超额,则通知用户
sendNotification(&user, "用户组过期")
// 更新最后通知日期
user.Notified()
}
}
}
// checkStoragePack 检查已过期的容量包
func checkStoragePack() {
packs := model.GetExpiredStoragePack()
for _, pack := range packs {
//找到所属用户
user, err := model.GetUserByID(pack.ID)
if err != nil {
util.Log().Warning("[定时任务] 无法获取用户 [UID=%d] 信息, %s", pack.ID, err)
continue
}
// 检查容量是否超额
if user.Storage > user.Group.MaxStorage+user.GetAvailablePackSize() {
// 如果超额,则通知用户
sendNotification(&user, "容量包过期")
// 删除过期的容量包
pack.Delete()
// 更新最后通知日期
user.Notified()
}
}
}
func sendNotification(user *model.User, reason string) {
title, body := email.NewOveruseNotification(user.Nick, reason)
if err := email.Send(user.Email, title, body); err != nil {
util.Log().Warning("无法发送通知邮件, %s", err)
}
}

38
pkg/email/init.go Normal file
View File

@@ -0,0 +1,38 @@
package email
import model "github.com/HFO4/cloudreve/models"
// Client 默认的邮件发送客户端
var Client Driver
// Init 初始化
func Init() {
if Client != nil {
Client.Close()
}
// 读取SMTP设置
options := model.GetSettingByNames(
"fromName",
"fromAdress",
"smtpHost",
"replyTo",
"smtpUser",
"smtpPass",
)
port := model.GetIntSetting("smtpPort", 25)
keepAlive := model.GetIntSetting("mail_keepalive", 30)
client := NewSMTPClient(SMTPConfig{
Name: options["fromName"],
Address: options["fromAdress"],
ReplyTo: options["replyTo"],
Host: options["smtpHost"],
Port: port,
User: options["smtpUser"],
Password: options["smtpPass"],
Keepalive: keepAlive,
})
Client = client
}

27
pkg/email/mail.go Normal file
View File

@@ -0,0 +1,27 @@
package email
import "errors"
// Driver 邮件发送驱动
type Driver interface {
// Close 关闭驱动
Close()
// Send 发送邮件
Send(to, title, body string) error
}
var (
// ErrChanNotOpen 邮件队列未开启
ErrChanNotOpen = errors.New("邮件队列未开启")
// ErrNoActiveDriver 无可用邮件发送服务
ErrNoActiveDriver = errors.New("无可用邮件发送服务")
)
// Send 发送邮件
func Send(to, title, body string) error {
if Client == nil {
return ErrNoActiveDriver
}
return Client.Send(to, title, body)
}

111
pkg/email/smtp.go Normal file
View File

@@ -0,0 +1,111 @@
package email
import (
"github.com/HFO4/cloudreve/pkg/util"
"github.com/go-mail/mail"
"time"
)
// SMTP SMTP协议发送邮件
type SMTP struct {
Config SMTPConfig
ch chan *mail.Message
chOpen bool
}
// SMTPConfig SMTP发送配置
type SMTPConfig struct {
Name string // 发送者名
Address string // 发送者地址
ReplyTo string // 回复地址
Host string // 服务器主机名
Port int // 服务器端口
User string // 用户名
Password string // 密码
Encryption string // 是否启用加密
Keepalive int // SMTP 连接保留时长
}
// NewSMTPClient 新建SMTP发送队列
func NewSMTPClient(config SMTPConfig) *SMTP {
client := &SMTP{
Config: config,
ch: make(chan *mail.Message, 30),
chOpen: false,
}
client.Init()
return client
}
// Send 发送邮件
func (client *SMTP) Send(to, title, body string) error {
if !client.chOpen {
return ErrChanNotOpen
}
m := mail.NewMessage()
m.SetHeader("From", client.Config.Address)
m.SetHeader("To", to)
m.SetHeader("Subject", title)
m.SetBody("text/html", body)
client.ch <- m
return nil
}
// Close 关闭发送队列
func (client *SMTP) Close() {
if client.ch != nil {
close(client.ch)
}
}
// Init 初始化发送队列
func (client *SMTP) Init() {
go func() {
defer func() {
if err := recover(); err != nil {
client.chOpen = false
util.Log().Error("邮件发送队列出现异常, %s ,30 秒后重新连接", err)
time.Sleep(time.Duration(30) * time.Second)
client.Init()
}
}()
d := mail.NewDialer(client.Config.Host, client.Config.Port, client.Config.User, client.Config.Password)
d.Timeout = time.Duration(client.Config.Keepalive+5) * time.Second
client.chOpen = true
var s mail.SendCloser
var err error
open := false
for {
select {
case m, ok := <-client.ch:
if !ok {
client.chOpen = false
return
}
if !open {
if s, err = d.Dial(); err != nil {
panic(err)
}
open = true
}
if err := mail.Send(s, m); err != nil {
util.Log().Warning("邮件发送失败, %s", err)
} else {
util.Log().Debug("邮件已发送")
}
// 长时间没有新邮件则关闭SMTP连接
case <-time.After(time.Duration(client.Config.Keepalive) * time.Second):
if open {
if err := s.Close(); err != nil {
util.Log().Warning("无法关闭 SMTP 连接 %s", err)
}
open = false
}
}
}
}()
}

21
pkg/email/template.go Normal file
View File

@@ -0,0 +1,21 @@
package email
import (
"fmt"
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/util"
)
// NewOveruseNotification 新建超额提醒邮件
func NewOveruseNotification(userName, reason string) (string, string) {
options := model.GetSettingByNames("siteName", "siteURL", "siteTitle", "over_used_template")
replace := map[string]string{
"{siteTitle}": options["siteName"],
"{userName}": userName,
"{notifyReason}": reason,
"{siteUrl}": options["siteURL"],
"{siteSecTitle}": options["siteTitle"],
}
return fmt.Sprintf("【%s】空间容量超额提醒", options["siteName"]),
util.Replace(replace, options["over_used_template"])
}

View File

@@ -115,6 +115,7 @@ func HookResetPolicy(ctx context.Context, fs *FileSystem) error {
}
fs.Policy = originFile.GetPolicy()
fs.User.Policy = *fs.Policy
return fs.DispatchHandler()
}

View File

@@ -137,7 +137,7 @@ func NewCompressTask(user *model.User, dst string, dirs, files []uint) (Job, err
// NewCompressTaskFromModel 从数据库记录中恢复压缩任务
func NewCompressTaskFromModel(task *model.Task) (Job, error) {
user, err := model.GetUserByID(task.UserID)
user, err := model.GetActiveUserByID(task.UserID)
if err != nil {
return nil, err
}

View File

@@ -109,7 +109,7 @@ func NewDecompressTask(user *model.User, src, dst string) (Job, error) {
// NewDecompressTaskFromModel 从数据库记录中恢复压缩任务
func NewDecompressTaskFromModel(task *model.Task) (Job, error) {
user, err := model.GetUserByID(task.UserID)
user, err := model.GetActiveUserByID(task.UserID)
if err != nil {
return nil, err
}

View File

@@ -109,7 +109,7 @@ func (job *TransferTask) Recycle() {
// NewTransferTask 新建中转任务
func NewTransferTask(user uint, src []string, dst, parent string) (Job, error) {
creator, err := model.GetUserByID(user)
creator, err := model.GetActiveUserByID(user)
if err != nil {
return nil, err
}
@@ -134,7 +134,7 @@ func NewTransferTask(user uint, src []string, dst, parent string) (Job, error) {
// NewTransferTaskFromModel 从数据库记录中恢复中转任务
func NewTransferTaskFromModel(task *model.Task) (Job, error) {
user, err := model.GetUserByID(task.UserID)
user, err := model.GetActiveUserByID(task.UserID)
if err != nil {
return nil, err
}

View File

@@ -344,8 +344,8 @@ func (h *Handler) handlePut(w http.ResponseWriter, r *http.Request, fs *filesyst
fs.Use("AfterValidateFailed", filesystem.HookUpdateSourceName)
}
fs.Use("BeforeUpload", filesystem.HookValidateFile)
fs.Use("BeforeUpload", filesystem.HookResetPolicy)
fs.Use("BeforeUpload", filesystem.HookValidateFile)
fs.Use("BeforeUpload", filesystem.HookChangeCapacity)
fs.Use("AfterUploadCanceled", filesystem.HookCleanFileContent)
fs.Use("AfterUploadCanceled", filesystem.HookClearFileSize)