by Robin at 20240201;

This commit is contained in:
Leufolium 2024-02-01 09:02:11 +08:00
parent 9430918b7b
commit f1ffbf473a
10 changed files with 206 additions and 8 deletions

View File

@ -18,13 +18,18 @@ import (
) )
type Mysql struct { type Mysql struct {
dbVas *sqlx.DB dbVas *sqlx.DB
dbXxlJob *sqlx.DB
} }
func (m *Mysql) getDBVas() *sqlx.DB { func (m *Mysql) getDBVas() *sqlx.DB {
return m.dbVas return m.dbVas
} }
func (m *Mysql) getDBXxlJob() *sqlx.DB {
return m.dbXxlJob
}
func (m *Mysql) VasBegin(ctx *gin.Context) (tx *sqlx.Tx, err error) { func (m *Mysql) VasBegin(ctx *gin.Context) (tx *sqlx.Tx, err error) {
tx, err = m.dbVas.BeginTxx(ctx, nil) tx, err = m.dbVas.BeginTxx(ctx, nil)
if err != nil { if err != nil {
@ -71,6 +76,9 @@ const (
TableConsumeHistoryWithdraw = "vas_ch_withdraw" // 提现明细 TableConsumeHistoryWithdraw = "vas_ch_withdraw" // 提现明细
TableVasUserUnlock = "vas_user_unlock" // 用增解锁 TableVasUserUnlock = "vas_user_unlock" // 用增解锁
TableWithdrawOrder = "vas_withdraw_order" // 提现订单表 TableWithdrawOrder = "vas_withdraw_order" // 提现订单表
DatabaseXxlJob = "xxl_job"
TableXxlJobLog = "xxl_job_log" // xxl_job日志表
) )
func (m *Mysql) ChTableName(ch *dbstruct.ConsumeHistory) (string, error) { func (m *Mysql) ChTableName(ch *dbstruct.ConsumeHistory) (string, error) {
@ -104,6 +112,21 @@ func NewMysql(cfg *conf.ConfigSt) (mysql *Mysql, err error) {
logger.Error("MysqlDB ping fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseVas, err) logger.Error("MysqlDB ping fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseVas, err)
return return
} }
mysql.dbXxlJob, err = mysqldb.NewMysqlDB(cfg.MixMysql, DatabaseXxlJob)
if err != nil {
logger.Error("NewMysqlDB fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseXxlJob, err)
return
}
mysql.dbXxlJob.SetMaxOpenConns(20)
mysql.dbXxlJob.SetMaxIdleConns(10)
err = mysql.dbXxlJob.Ping()
if err != nil {
logger.Error("MysqlDB ping fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseXxlJob, err)
return
}
return return
} }
@ -712,3 +735,42 @@ func (m *Mysql) GetWithdrawOrdersByMid(ctx *gin.Context, tx *sqlx.Tx, mid, st, e
} }
return return
} }
// 获取指定任务中所有已经执行成功的xxl_job任务
func (m *Mysql) GetSuccessXxlJobLogs(ctx *gin.Context, tx *sqlx.Tx, jobId int64, errorPrefix string) (list []*dbstruct.XxlJobLog, err error) {
list = make([]*dbstruct.XxlJobLog, 0)
sqlStr := fmt.Sprintf("select * from %s where job_id = ? and trigger_code = 200 and handle_code = 200 and trigger_msg not like (?%%)", TableXxlJobLog)
if tx != nil {
err = tx.SelectContext(ctx, &list, sqlStr, jobId, errorPrefix)
} else {
db := m.getDBXxlJob()
err = db.SelectContext(ctx, &list, sqlStr, jobId, errorPrefix)
}
if err == sql.ErrNoRows {
err = nil
return
}
if err != nil {
return
}
return
}
// 删除指定任务中所有已经执行成功的xxl_job任务
func (m *Mysql) DeleteSuccessXxlJobLogs(ctx *gin.Context, tx *sqlx.Tx, ids []int64) (err error) {
sqlStr := fmt.Sprintf("delete * from %s where id in (%s)", TableXxlJobLog, util.Convert2SqlArr(ids))
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr)
} else {
db := m.getDBXxlJob()
_, err = db.ExecContext(ctx, sqlStr)
}
if err == sql.ErrNoRows {
err = nil
return
}
if err != nil {
return
}
return
}

View File

@ -51,6 +51,7 @@ func (s *CronService) Init(c any) (err error) {
exec.RegTask("image_audit_batch_his", s.ImageAuditBatchHis) exec.RegTask("image_audit_batch_his", s.ImageAuditBatchHis)
exec.RegTask("text_audit_batch", s.TextAuditBatch) exec.RegTask("text_audit_batch", s.TextAuditBatch)
exec.RegTask("text_audit_batch_his", s.TextAuditBatchHis) exec.RegTask("text_audit_batch_his", s.TextAuditBatchHis)
exec.RegTask("clear_content_audit_batch_execution_logs", s.ClearContentAuditBatchExecutionLogs)
exec.LogHandler(customLogHandle) exec.LogHandler(customLogHandle)
//注册任务handler //注册任务handler

View File

@ -0,0 +1,70 @@
package logic
import (
"bufio"
"fmt"
"os"
"service/app/mix/dao"
"service/bizcommon/util"
"service/library/configcenter"
"service/library/logger"
"time"
"github.com/gin-gonic/gin"
)
type XxlJob struct {
store *dao.Store
cfg *configcenter.XxlJobConfig
}
func NewXxlJob(store *dao.Store, cfg *configcenter.XxlJobConfig) (x *XxlJob) {
return &XxlJob{
store: store,
cfg: cfg,
}
}
func (x *XxlJob) ClearSuccXxlJobLogs(ctx *gin.Context, jobId int64, errorPrefix string) (err error) {
// 查出当天所有已成功的日志
list, err := x.store.GetSuccessXxlJobLogs(ctx, nil, jobId, errorPrefix)
if err != nil {
logger.Error("GetSuccessXxlJobLogs fail, err: %v, jodId: %v, errorPrefix: %v", err, jobId, errorPrefix)
return
}
ids := make([]int64, 0)
// 将当天所有已成功的日志存入文件
logFormatStr := "trigger_time:%v, handle_time:%v, id:%v, job_group:%v, job_id:%v, executor_address:%v, executor_handler:%v, trigger_msg:%v, handle_msg:%v"
logtime := time.Unix(time.Now().Unix()-86400, 0)
logPath := fmt.Sprintf("%s/xxl_job_log_%d%02d%02d", x.cfg.LogPath, logtime.Year(), logtime.Month(), logtime.Day())
outfile, err := os.OpenFile(logPath, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
logger.Error("Open File Err: %v", err)
}
defer outfile.Close()
writer := bufio.NewWriter(outfile)
for _, xxlJobLog := range list {
ids = append(ids, util.DerefInt64(xxlJobLog.Id))
triggerTime := util.DerefString(xxlJobLog.TriggerTime)
handleTime := util.DerefString(xxlJobLog.HandleTime)
id := util.DerefInt64(xxlJobLog.Id)
jobGroup := util.DerefInt32(xxlJobLog.JobGroup)
jobId := util.DerefInt32(xxlJobLog.JobId)
executorAddress := util.DerefString(xxlJobLog.ExecutorAddress)
executorHandler := util.DerefString(xxlJobLog.ExecutorHandler)
triggerMsg := util.DerefString(xxlJobLog.TriggerMsg)
handleMsg := util.DerefString(xxlJobLog.HandleMsg)
str := fmt.Sprintf(logFormatStr, triggerTime, handleTime, id, jobGroup, jobId, executorAddress, executorHandler, triggerMsg, handleMsg)
writer.WriteString(str)
}
writer.Flush()
// 删除表内已成功的日志记录
err = x.store.DeleteSuccessXxlJobLogs(ctx, nil, ids)
if err != nil {
logger.Error("DeleteSuccessXxlJobLogs fail, err: %v, ids: %v", err, ids)
return
}
return
}

View File

@ -94,6 +94,7 @@ var (
_DefaultTextAuditTask *logic.TextAuditTask _DefaultTextAuditTask *logic.TextAuditTask
_DefaultContactCustomerServiceSession *logic.ContactCustomerServiceSession _DefaultContactCustomerServiceSession *logic.ContactCustomerServiceSession
_DefaultDailyStatement *logic.DailyStatement _DefaultDailyStatement *logic.DailyStatement
_DefaultXxlJob *logic.XxlJob
) )
type Service struct { type Service struct {
@ -160,6 +161,7 @@ func (s *Service) Init(c any) (err error) {
_DefaultVas = logic.NewVas(store, _DefaultStreamer) _DefaultVas = logic.NewVas(store, _DefaultStreamer)
_DefaultContactCustomerServiceSession = logic.NewContactCustomerServiceSession(store) _DefaultContactCustomerServiceSession = logic.NewContactCustomerServiceSession(store)
_DefaultDailyStatement = logic.NewDailyStatement(store) _DefaultDailyStatement = logic.NewDailyStatement(store)
_DefaultXxlJob = logic.NewXxlJob(store, cfg.XxlJob)
return return
} }

View File

@ -15,6 +15,8 @@ import (
"service/library/contentaudit/textaudit" "service/library/contentaudit/textaudit"
"service/library/logger" "service/library/logger"
"service/library/redis" "service/library/redis"
"strconv"
"strings"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -132,7 +134,7 @@ func (s *CronService) ImageAuditBatch(ctx context.Context, param *xxl.RunReq) (m
// 执行图像审核 // 执行图像审核
successNum, failNum, err := imageaudit.Run(batchId) successNum, failNum, err := imageaudit.Run(batchId)
if err != nil { if err != nil {
return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)
} }
logger.Info("Image audit batch ends...") logger.Info("Image audit batch ends...")
return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
@ -145,7 +147,7 @@ func (s *CronService) ImageAuditBatchHis(ctx context.Context, param *xxl.RunReq)
// 执行图像审核 // 执行图像审核
successNum, failNum, err := imageaudit.Run(batchId) successNum, failNum, err := imageaudit.Run(batchId)
if err != nil { if err != nil {
return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)
} }
logger.Info("Image audit batch ends...") logger.Info("Image audit batch ends...")
return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
@ -158,7 +160,7 @@ func (s *CronService) TextAuditBatch(ctx context.Context, param *xxl.RunReq) (ms
// 执行图像审核 // 执行图像审核
successNum, failNum, err := textaudit.Run(batchId) successNum, failNum, err := textaudit.Run(batchId)
if err != nil { if err != nil {
return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)
} }
logger.Info("Text audit batch ends...") logger.Info("Text audit batch ends...")
return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
@ -171,8 +173,32 @@ func (s *CronService) TextAuditBatchHis(ctx context.Context, param *xxl.RunReq)
// 执行图像审核 // 执行图像审核
successNum, failNum, err := textaudit.Run(batchId) successNum, failNum, err := textaudit.Run(batchId)
if err != nil { if err != nil {
return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)
} }
logger.Info("Text audit batch ends...") logger.Info("Text audit batch ends...")
return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
} }
// 每天0点定时清空图像和文字审核作业的日志
func (s *CronService) ClearContentAuditBatchExecutionLogs(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
inParams := strings.Split(param.ExecutorParams, "|")
if len(inParams) < 2 {
return "Wrong executor params format!"
}
jobId := inParams[0]
errorPrefix := inParams[1]
jobIdVal, err := strconv.Atoi(jobId)
if err != nil {
return err.Error()
}
err = _DefaultXxlJob.ClearSuccXxlJobLogs(&gin.Context{}, int64(jobIdVal), errorPrefix)
if err != nil {
return err.Error()
}
return "clear content audit batch execution logs finished..."
}

19
dbstruct/xxljob_mysql.go Normal file
View File

@ -0,0 +1,19 @@
package dbstruct
type XxlJobLog struct {
Id *int64 `json:"id" db:"id"`
JobGroup *int32 `json:"job_group" db:"job_group"`
JobId *int32 `json:"job_id" db:"job_id"`
ExecutorAddress *string `json:"executor_address" db:"executor_address"`
ExecutorHandler *string `json:"executor_handler" db:"executor_handler"`
ExecutorParam *string `json:"executor_param" db:"executor_param"`
ExecutorShardingParam *string `json:"executor_sharding_param" db:"executor_sharding_param"`
ExecutorFailRetryCount *int32 `json:"executor_fail_retry_count" db:"executor_fail_retry_count"`
TriggerTime *string `json:"trigger_time" db:"trigger_time"`
TriggerCode *int32 `json:"trigger_code" db:"trigger_code"`
TriggerMsg *string `json:"trigger_msg" db:"trigger_msg"`
HandleTime *string `json:"handle_time" db:"handle_time"`
HandleCode *int32 `json:"handle_code" db:"handle_code"`
HandleMsg *string `json:"handle_msg" db:"handle_msg"`
AlarmStatus *int32 `json:"alarm_status" db:"alarm_status"`
}

View File

@ -83,4 +83,12 @@ redis:
prefix: "" prefix: ""
server_info: server_info:
file_server_domain_name: "https://file.wishpaldev.tech/" file_server_domain_name: "https://file.wishpaldev.tech/"
xxl_job:
server_addr: "http://127.0.0.1:9800/xxl-job-admin"
access_token: "default_token"
executor_ip: "127.0.0.1"
executor_port: "9801"
registry_key: "golang-jobs-executor"
log_path: "/Users/erwin/log/wishpal-ironfan/"

View File

@ -91,4 +91,12 @@ redis:
prefix: "" prefix: ""
server_info: server_info:
file_server_domain_name: "https://file.tiefen.fun/" file_server_domain_name: "https://file.tiefen.fun/"
xxl_job:
server_addr: "http://127.0.0.1:9800/xxl-job-admin"
access_token: "default_token"
executor_ip: "127.0.0.1"
executor_port: "9801"
registry_key: "golang-jobs-executor"
log_path: "/app/ironfan/log/xxl_job/"

View File

@ -97,4 +97,5 @@ xxl_job:
access_token: "default_token" access_token: "default_token"
executor_ip: "127.0.0.1" executor_ip: "127.0.0.1"
executor_port: "9801" executor_port: "9801"
registry_key: "golang-jobs-executor" registry_key: "golang-jobs-executor"
log_path: "/Users/erwin/log/wishpal-ironfan/xxl_job"

View File

@ -141,6 +141,7 @@ type XxlJobConfig struct {
ExecutorIp string `json:"executor_ip" yaml:"executor_ip"` ExecutorIp string `json:"executor_ip" yaml:"executor_ip"`
ExecutorPort string `json:"executor_port" yaml:"executor_port"` ExecutorPort string `json:"executor_port" yaml:"executor_port"`
RegistryKey string `json:"registry_key" yaml:"registry_key"` RegistryKey string `json:"registry_key" yaml:"registry_key"`
LogPath string `json:"log_path" yaml:"log_path"`
} }
func LoadConfig(configFilePath string, cfg interface{}) error { func LoadConfig(configFilePath string, cfg interface{}) error {