From f1ffbf473a5f2d80a67f2b70d8d8480859b2b7a4 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Thu, 1 Feb 2024 09:02:11 +0800 Subject: [PATCH] by Robin at 20240201; --- app/mix/dao/mysql.go | 64 ++++++++++++++++++++++++- app/mix/service/cronservice.go | 1 + app/mix/service/logic/xxljob.go | 70 ++++++++++++++++++++++++++++ app/mix/service/service.go | 2 + app/mix/service/xxljob_tasks.go | 34 ++++++++++++-- dbstruct/xxljob_mysql.go | 19 ++++++++ etc/mix/mix-local.yaml | 10 +++- etc/mix/mix-prod.yaml | 10 +++- etc/mix/mix-test.yaml | 3 +- library/configcenter/configcenter.go | 1 + 10 files changed, 206 insertions(+), 8 deletions(-) create mode 100644 app/mix/service/logic/xxljob.go create mode 100644 dbstruct/xxljob_mysql.go diff --git a/app/mix/dao/mysql.go b/app/mix/dao/mysql.go index bd1296b3..819c769a 100644 --- a/app/mix/dao/mysql.go +++ b/app/mix/dao/mysql.go @@ -18,13 +18,18 @@ import ( ) type Mysql struct { - dbVas *sqlx.DB + dbVas *sqlx.DB + dbXxlJob *sqlx.DB } func (m *Mysql) getDBVas() *sqlx.DB { return m.dbVas } +func (m *Mysql) getDBXxlJob() *sqlx.DB { + return m.dbXxlJob +} + func (m *Mysql) VasBegin(ctx *gin.Context) (tx *sqlx.Tx, err error) { tx, err = m.dbVas.BeginTxx(ctx, nil) if err != nil { @@ -71,6 +76,9 @@ const ( TableConsumeHistoryWithdraw = "vas_ch_withdraw" // 提现明细 TableVasUserUnlock = "vas_user_unlock" // 用增解锁 TableWithdrawOrder = "vas_withdraw_order" // 提现订单表 + + DatabaseXxlJob = "xxl_job" + TableXxlJobLog = "xxl_job_log" // xxl_job日志表 ) func (m *Mysql) ChTableName(ch *dbstruct.ConsumeHistory) (string, error) { @@ -104,6 +112,21 @@ func NewMysql(cfg *conf.ConfigSt) (mysql *Mysql, err error) { logger.Error("MysqlDB ping fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseVas, err) return } + + mysql.dbXxlJob, err = mysqldb.NewMysqlDB(cfg.MixMysql, DatabaseXxlJob) + if err != nil { + logger.Error("NewMysqlDB fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseXxlJob, err) + return + } + mysql.dbXxlJob.SetMaxOpenConns(20) + mysql.dbXxlJob.SetMaxIdleConns(10) + + err = mysql.dbXxlJob.Ping() + if err != nil { + logger.Error("MysqlDB ping fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseXxlJob, err) + return + } + return } @@ -712,3 +735,42 @@ func (m *Mysql) GetWithdrawOrdersByMid(ctx *gin.Context, tx *sqlx.Tx, mid, st, e } return } + +// 获取指定任务中所有已经执行成功的xxl_job任务 +func (m *Mysql) GetSuccessXxlJobLogs(ctx *gin.Context, tx *sqlx.Tx, jobId int64, errorPrefix string) (list []*dbstruct.XxlJobLog, err error) { + list = make([]*dbstruct.XxlJobLog, 0) + sqlStr := fmt.Sprintf("select * from %s where job_id = ? and trigger_code = 200 and handle_code = 200 and trigger_msg not like (?%%)", TableXxlJobLog) + if tx != nil { + err = tx.SelectContext(ctx, &list, sqlStr, jobId, errorPrefix) + } else { + db := m.getDBXxlJob() + err = db.SelectContext(ctx, &list, sqlStr, jobId, errorPrefix) + } + if err == sql.ErrNoRows { + err = nil + return + } + if err != nil { + return + } + return +} + +// 删除指定任务中所有已经执行成功的xxl_job任务 +func (m *Mysql) DeleteSuccessXxlJobLogs(ctx *gin.Context, tx *sqlx.Tx, ids []int64) (err error) { + sqlStr := fmt.Sprintf("delete * from %s where id in (%s)", TableXxlJobLog, util.Convert2SqlArr(ids)) + if tx != nil { + _, err = tx.ExecContext(ctx, sqlStr) + } else { + db := m.getDBXxlJob() + _, err = db.ExecContext(ctx, sqlStr) + } + if err == sql.ErrNoRows { + err = nil + return + } + if err != nil { + return + } + return +} diff --git a/app/mix/service/cronservice.go b/app/mix/service/cronservice.go index 85e1156f..f7d95bc3 100644 --- a/app/mix/service/cronservice.go +++ b/app/mix/service/cronservice.go @@ -51,6 +51,7 @@ func (s *CronService) Init(c any) (err error) { exec.RegTask("image_audit_batch_his", s.ImageAuditBatchHis) exec.RegTask("text_audit_batch", s.TextAuditBatch) exec.RegTask("text_audit_batch_his", s.TextAuditBatchHis) + exec.RegTask("clear_content_audit_batch_execution_logs", s.ClearContentAuditBatchExecutionLogs) exec.LogHandler(customLogHandle) //注册任务handler diff --git a/app/mix/service/logic/xxljob.go b/app/mix/service/logic/xxljob.go new file mode 100644 index 00000000..4782528b --- /dev/null +++ b/app/mix/service/logic/xxljob.go @@ -0,0 +1,70 @@ +package logic + +import ( + "bufio" + "fmt" + "os" + "service/app/mix/dao" + "service/bizcommon/util" + "service/library/configcenter" + "service/library/logger" + "time" + + "github.com/gin-gonic/gin" +) + +type XxlJob struct { + store *dao.Store + cfg *configcenter.XxlJobConfig +} + +func NewXxlJob(store *dao.Store, cfg *configcenter.XxlJobConfig) (x *XxlJob) { + return &XxlJob{ + store: store, + cfg: cfg, + } +} + +func (x *XxlJob) ClearSuccXxlJobLogs(ctx *gin.Context, jobId int64, errorPrefix string) (err error) { + // 查出当天所有已成功的日志 + list, err := x.store.GetSuccessXxlJobLogs(ctx, nil, jobId, errorPrefix) + if err != nil { + logger.Error("GetSuccessXxlJobLogs fail, err: %v, jodId: %v, errorPrefix: %v", err, jobId, errorPrefix) + return + } + ids := make([]int64, 0) + + // 将当天所有已成功的日志存入文件 + logFormatStr := "trigger_time:%v, handle_time:%v, id:%v, job_group:%v, job_id:%v, executor_address:%v, executor_handler:%v, trigger_msg:%v, handle_msg:%v" + logtime := time.Unix(time.Now().Unix()-86400, 0) + logPath := fmt.Sprintf("%s/xxl_job_log_%d%02d%02d", x.cfg.LogPath, logtime.Year(), logtime.Month(), logtime.Day()) + outfile, err := os.OpenFile(logPath, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + logger.Error("Open File Err: %v", err) + } + defer outfile.Close() + writer := bufio.NewWriter(outfile) + for _, xxlJobLog := range list { + ids = append(ids, util.DerefInt64(xxlJobLog.Id)) + triggerTime := util.DerefString(xxlJobLog.TriggerTime) + handleTime := util.DerefString(xxlJobLog.HandleTime) + id := util.DerefInt64(xxlJobLog.Id) + jobGroup := util.DerefInt32(xxlJobLog.JobGroup) + jobId := util.DerefInt32(xxlJobLog.JobId) + executorAddress := util.DerefString(xxlJobLog.ExecutorAddress) + executorHandler := util.DerefString(xxlJobLog.ExecutorHandler) + triggerMsg := util.DerefString(xxlJobLog.TriggerMsg) + handleMsg := util.DerefString(xxlJobLog.HandleMsg) + str := fmt.Sprintf(logFormatStr, triggerTime, handleTime, id, jobGroup, jobId, executorAddress, executorHandler, triggerMsg, handleMsg) + writer.WriteString(str) + } + writer.Flush() + + // 删除表内已成功的日志记录 + err = x.store.DeleteSuccessXxlJobLogs(ctx, nil, ids) + if err != nil { + logger.Error("DeleteSuccessXxlJobLogs fail, err: %v, ids: %v", err, ids) + return + } + return +} diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 4c3f88f4..3ef7c49e 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -94,6 +94,7 @@ var ( _DefaultTextAuditTask *logic.TextAuditTask _DefaultContactCustomerServiceSession *logic.ContactCustomerServiceSession _DefaultDailyStatement *logic.DailyStatement + _DefaultXxlJob *logic.XxlJob ) type Service struct { @@ -160,6 +161,7 @@ func (s *Service) Init(c any) (err error) { _DefaultVas = logic.NewVas(store, _DefaultStreamer) _DefaultContactCustomerServiceSession = logic.NewContactCustomerServiceSession(store) _DefaultDailyStatement = logic.NewDailyStatement(store) + _DefaultXxlJob = logic.NewXxlJob(store, cfg.XxlJob) return } diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index df67fd28..560c1aee 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -15,6 +15,8 @@ import ( "service/library/contentaudit/textaudit" "service/library/logger" "service/library/redis" + "strconv" + "strings" "time" "github.com/gin-gonic/gin" @@ -132,7 +134,7 @@ func (s *CronService) ImageAuditBatch(ctx context.Context, param *xxl.RunReq) (m // 执行图像审核 successNum, failNum, err := imageaudit.Run(batchId) if err != nil { - return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) + return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) } logger.Info("Image audit batch ends...") return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) @@ -145,7 +147,7 @@ func (s *CronService) ImageAuditBatchHis(ctx context.Context, param *xxl.RunReq) // 执行图像审核 successNum, failNum, err := imageaudit.Run(batchId) if err != nil { - return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) + return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) } logger.Info("Image audit batch ends...") return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) @@ -158,7 +160,7 @@ func (s *CronService) TextAuditBatch(ctx context.Context, param *xxl.RunReq) (ms // 执行图像审核 successNum, failNum, err := textaudit.Run(batchId) if err != nil { - return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) + return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) } logger.Info("Text audit batch ends...") return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) @@ -171,8 +173,32 @@ func (s *CronService) TextAuditBatchHis(ctx context.Context, param *xxl.RunReq) // 执行图像审核 successNum, failNum, err := textaudit.Run(batchId) if err != nil { - return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) + return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) } logger.Info("Text audit batch ends...") return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) } + +// 每天0点定时清空图像和文字审核作业的日志 +func (s *CronService) ClearContentAuditBatchExecutionLogs(ctx context.Context, param *xxl.RunReq) (msg string) { + logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) + + inParams := strings.Split(param.ExecutorParams, "|") + if len(inParams) < 2 { + return "Wrong executor params format!" + } + jobId := inParams[0] + errorPrefix := inParams[1] + + jobIdVal, err := strconv.Atoi(jobId) + if err != nil { + return err.Error() + } + + err = _DefaultXxlJob.ClearSuccXxlJobLogs(&gin.Context{}, int64(jobIdVal), errorPrefix) + if err != nil { + return err.Error() + } + + return "clear content audit batch execution logs finished..." +} diff --git a/dbstruct/xxljob_mysql.go b/dbstruct/xxljob_mysql.go new file mode 100644 index 00000000..a4aceddd --- /dev/null +++ b/dbstruct/xxljob_mysql.go @@ -0,0 +1,19 @@ +package dbstruct + +type XxlJobLog struct { + Id *int64 `json:"id" db:"id"` + JobGroup *int32 `json:"job_group" db:"job_group"` + JobId *int32 `json:"job_id" db:"job_id"` + ExecutorAddress *string `json:"executor_address" db:"executor_address"` + ExecutorHandler *string `json:"executor_handler" db:"executor_handler"` + ExecutorParam *string `json:"executor_param" db:"executor_param"` + ExecutorShardingParam *string `json:"executor_sharding_param" db:"executor_sharding_param"` + ExecutorFailRetryCount *int32 `json:"executor_fail_retry_count" db:"executor_fail_retry_count"` + TriggerTime *string `json:"trigger_time" db:"trigger_time"` + TriggerCode *int32 `json:"trigger_code" db:"trigger_code"` + TriggerMsg *string `json:"trigger_msg" db:"trigger_msg"` + HandleTime *string `json:"handle_time" db:"handle_time"` + HandleCode *int32 `json:"handle_code" db:"handle_code"` + HandleMsg *string `json:"handle_msg" db:"handle_msg"` + AlarmStatus *int32 `json:"alarm_status" db:"alarm_status"` +} diff --git a/etc/mix/mix-local.yaml b/etc/mix/mix-local.yaml index 36bcec73..7b2980e5 100644 --- a/etc/mix/mix-local.yaml +++ b/etc/mix/mix-local.yaml @@ -83,4 +83,12 @@ redis: prefix: "" server_info: - file_server_domain_name: "https://file.wishpaldev.tech/" \ No newline at end of file + file_server_domain_name: "https://file.wishpaldev.tech/" + +xxl_job: + server_addr: "http://127.0.0.1:9800/xxl-job-admin" + access_token: "default_token" + executor_ip: "127.0.0.1" + executor_port: "9801" + registry_key: "golang-jobs-executor" + log_path: "/Users/erwin/log/wishpal-ironfan/" \ No newline at end of file diff --git a/etc/mix/mix-prod.yaml b/etc/mix/mix-prod.yaml index 7f875bd7..f2c6e368 100644 --- a/etc/mix/mix-prod.yaml +++ b/etc/mix/mix-prod.yaml @@ -91,4 +91,12 @@ redis: prefix: "" server_info: - file_server_domain_name: "https://file.tiefen.fun/" \ No newline at end of file + file_server_domain_name: "https://file.tiefen.fun/" + +xxl_job: + server_addr: "http://127.0.0.1:9800/xxl-job-admin" + access_token: "default_token" + executor_ip: "127.0.0.1" + executor_port: "9801" + registry_key: "golang-jobs-executor" + log_path: "/app/ironfan/log/xxl_job/" \ No newline at end of file diff --git a/etc/mix/mix-test.yaml b/etc/mix/mix-test.yaml index 3158ac12..f5c50732 100644 --- a/etc/mix/mix-test.yaml +++ b/etc/mix/mix-test.yaml @@ -97,4 +97,5 @@ xxl_job: access_token: "default_token" executor_ip: "127.0.0.1" executor_port: "9801" - registry_key: "golang-jobs-executor" \ No newline at end of file + registry_key: "golang-jobs-executor" + log_path: "/Users/erwin/log/wishpal-ironfan/xxl_job" \ No newline at end of file diff --git a/library/configcenter/configcenter.go b/library/configcenter/configcenter.go index ca59e20d..533fe409 100644 --- a/library/configcenter/configcenter.go +++ b/library/configcenter/configcenter.go @@ -141,6 +141,7 @@ type XxlJobConfig struct { ExecutorIp string `json:"executor_ip" yaml:"executor_ip"` ExecutorPort string `json:"executor_port" yaml:"executor_port"` RegistryKey string `json:"registry_key" yaml:"registry_key"` + LogPath string `json:"log_path" yaml:"log_path"` } func LoadConfig(configFilePath string, cfg interface{}) error {