diff --git a/app/mix/dao/mysql.go b/app/mix/dao/mysql.go index 84e0eb1c..cd8f6e55 100644 --- a/app/mix/dao/mysql.go +++ b/app/mix/dao/mysql.go @@ -856,6 +856,99 @@ func (m *Mysql) GetSuccessXxlJobLogs(ctx *gin.Context, tx *sqlx.Tx, jobIdsStr st return } +// 更新指定任务的任务信息 +func (m *Mysql) UpdateXxlJobLog(ctx *gin.Context, tx *sqlx.Tx, xxlJobLog *dbstruct.XxlJobLog) (err error) { + + setClauses := make([]string, 0) + setValue := make([]any, 0) + if xxlJobLog.JobGroup != nil { + setClauses = append(setClauses, " job_group = ? ") + setValue = append(setValue, xxlJobLog.GetJobGroup()) + } + if xxlJobLog.JobId != nil { + setClauses = append(setClauses, " job_id = ? ") + setValue = append(setValue, xxlJobLog.GetJobId()) + } + if xxlJobLog.ExecutorAddress != nil { + setClauses = append(setClauses, " executor_address = ? ") + setValue = append(setValue, xxlJobLog.GetExecutorAddress()) + } + if xxlJobLog.ExecutorHandler != nil { + setClauses = append(setClauses, " executor_handler = ? ") + setValue = append(setValue, xxlJobLog.GetExecutorHandler()) + } + if xxlJobLog.ExecutorParam != nil { + setClauses = append(setClauses, " executor_param = ? ") + setValue = append(setValue, xxlJobLog.GetExecutorParam()) + } + if xxlJobLog.ExecutorShardingParam != nil { + setClauses = append(setClauses, " executor_sharding_param = ? ") + setValue = append(setValue, xxlJobLog.GetExecutorShardingParam()) + } + if xxlJobLog.ExecutorFailRetryCount != nil { + setClauses = append(setClauses, " executor_fail_retry_count = ? ") + setValue = append(setValue, xxlJobLog.GetExecutorFailRetryCount()) + } + if xxlJobLog.TriggerTime != nil { + setClauses = append(setClauses, " trigger_time = ? ") + setValue = append(setValue, xxlJobLog.GetTriggerTime()) + } + if xxlJobLog.TriggerCode != nil { + setClauses = append(setClauses, " trigger_code = ? ") + setValue = append(setValue, xxlJobLog.GetTriggerCode()) + } + if xxlJobLog.TriggerMsg != nil { + setClauses = append(setClauses, " trigger_msg = ? ") + setValue = append(setValue, xxlJobLog.GetTriggerMsg()) + } + if xxlJobLog.HandleTime != nil { + setClauses = append(setClauses, " handle_time = ? ") + setValue = append(setValue, xxlJobLog.GetHandleTime()) + } + if xxlJobLog.HandleCode != nil { + setClauses = append(setClauses, " handle_code = ? ") + setValue = append(setValue, xxlJobLog.GetHandleCode()) + } + if xxlJobLog.HandleMsg != nil { + setClauses = append(setClauses, " handle_msg = ? ") + setValue = append(setValue, xxlJobLog.GetHandleMsg()) + } + if xxlJobLog.AlarmStatus != nil { + setClauses = append(setClauses, " alarm_status = ? ") + setValue = append(setValue, xxlJobLog.GetAlarmStatus()) + } + + if len(setValue) == 0 { + return + } + + setValue = append(setValue, xxlJobLog.GetId()) + + setClause := &strings.Builder{} + for i, clause := range setClauses { + if i > 0 { + setClause.WriteString(",") + } + setClause.WriteString(clause) + } + + sqlStr := fmt.Sprintf("update %s set %s where id = ?", TableXxlJobLog, setClause) + if tx != nil { + _, err = tx.ExecContext(ctx, sqlStr, setValue...) + } else { + db := m.getDBXxlJob() + _, err = db.ExecContext(ctx, sqlStr, setValue...) + } + if err == sql.ErrNoRows { + err = nil + return + } + if err != nil { + return + } + return +} + // 删除指定任务中所有已经执行成功的xxl_job任务 func (m *Mysql) DeleteSuccessXxlJobLogs(ctx *gin.Context, tx *sqlx.Tx, ids []int64) (err error) { if len(ids) == 0 { diff --git a/app/mix/service/cronservice.go b/app/mix/service/cronservice.go index 18c79e56..691cb46d 100644 --- a/app/mix/service/cronservice.go +++ b/app/mix/service/cronservice.go @@ -52,7 +52,7 @@ func (s *CronService) Init(c any) (err error) { exec.RegTask("image_audit_batch_his", s.ImageAuditBatchHis) exec.RegTask("text_audit_batch", s.TextAuditBatch) exec.RegTask("text_audit_batch_his", s.TextAuditBatchHis) - exec.RegTask("clear_content_audit_batch_execution_logs", s.ClearContentAuditBatchExecutionLogs) + //exec.RegTask("clear_content_audit_batch_execution_logs", s.ClearContentAuditBatchExecutionLogs) exec.LogHandler(customLogHandle) //注册任务handler diff --git a/app/mix/service/imageaudittask_result_handler.go b/app/mix/service/imageaudittask_result_handler.go index e53abd67..405e98e6 100644 --- a/app/mix/service/imageaudittask_result_handler.go +++ b/app/mix/service/imageaudittask_result_handler.go @@ -10,6 +10,7 @@ import ( "service/dbstruct" "github.com/gin-gonic/gin" + goproto "google.golang.org/protobuf/proto" ) var DefaultImageAuditTaskResultHandler *ImageAuditTaskResultHandler @@ -115,47 +116,71 @@ func (handler *ImageAuditTaskResultHandler) generateStreamerAlbumUpdateFunc() { } } -// 动态表->图像内容,若不通过,则删掉审核不通过的图片 +// 动态表->图像内容,若不通过,则删掉审核不通过的图片,图片仅剩一张则删除动态,若通过,则若动态已被删除,恢复该动态至之前状态 func (handler *ImageAuditTaskResultHandler) generateMomentMediaComponentUpdateFunc() { handler.imageAuditTaskUpdateFuncGeneratorMap["moment|moment|media_component"] = func(ctx *gin.Context, task *dbstruct.ImageAuditTask, option int) func() error { return func() error { - momentId := task.AssociativeTableId - var mediaComp *dbstruct.MediaComponent if option == consts.ImageAuditTaskUpdate_Pass { - mediaComp = task.AuditedMedia + return passMomentImageAuditTask(ctx, task) + } else { - auditedImageIds := util.DerefInt64Slice(task.AuditedMedia.ImageIds) - imageIds := make([]int64, 0) - for i, pass := range task.AuditedMediaResults { - if pass { - imageIds = append(imageIds, auditedImageIds[i]) - } - } - if len(imageIds) != 0 { - mediaComp = &dbstruct.MediaComponent{ - ImageIds: &imageIds, - } - } - } - // 如果旧媒体内容为空,则删除这条动态 - if mediaComp == nil { - return _DefaultMoment.OpDelete(ctx, util.DerefInt64(momentId)) - } else { - if err := _DefaultMoment.OpUpdate(ctx, &momentproto.OpUpdateReq{ - Moment: &dbstruct.Moment{ - Id: momentId, - MediaComp: mediaComp, - }, - }); err != nil { - return err - } - if err := _DefaultMomentAuditTask.OpUpdateByImageAuditTaskIds(ctx, &dbstruct.MomentAuditTask{ - FinalMedia: mediaComp, - }, []string{util.DerefString(task.Id)}); err != nil { - return err - } - return nil + return rollbackMomentImageAuditTask(ctx, task) } } } } + +// 通过动态表图像 +func passMomentImageAuditTask(ctx *gin.Context, task *dbstruct.ImageAuditTask) (err error) { + if err := _DefaultMoment.OpUpdate(ctx, &momentproto.OpUpdateReq{ + Moment: &dbstruct.Moment{ + Id: task.AssociativeTableId, + MediaComp: task.AuditedMedia, + DelFlag: goproto.Int64(consts.Exist), + }, + }); err != nil { + return err + } + + if err := _DefaultMomentAuditTask.OpUpdateByImageAuditTaskIds(ctx, &dbstruct.MomentAuditTask{ + FinalMedia: task.AuditedMedia, + }, []string{util.DerefString(task.Id)}); err != nil { + return err + } + return nil +} + +// 回退动态表图像 +func rollbackMomentImageAuditTask(ctx *gin.Context, task *dbstruct.ImageAuditTask) (err error) { + auditedImageIds := util.DerefInt64Slice(task.AuditedMedia.ImageIds) + imageIds := make([]int64, 0) + for i, pass := range task.AuditedMediaResults { + if pass { + imageIds = append(imageIds, auditedImageIds[i]) + } + } + if len(imageIds) != 0 { + mediaComp := &dbstruct.MediaComponent{ + ImageIds: &imageIds, + } + + if err := _DefaultMoment.OpUpdate(ctx, &momentproto.OpUpdateReq{ + Moment: &dbstruct.Moment{ + Id: task.AssociativeTableId, + MediaComp: mediaComp, + }, + }); err != nil { + return err + } + + if err := _DefaultMomentAuditTask.OpUpdateByImageAuditTaskIds(ctx, &dbstruct.MomentAuditTask{ + FinalMedia: mediaComp, + }, []string{util.DerefString(task.Id)}); err != nil { + return err + } + + return nil + } else { + return _DefaultMoment.OpDelete(ctx, util.DerefInt64(task.AssociativeTableId)) + } +} diff --git a/app/mix/service/logic/xxljob.go b/app/mix/service/logic/xxljob.go index 30f09dec..333a2673 100644 --- a/app/mix/service/logic/xxljob.go +++ b/app/mix/service/logic/xxljob.go @@ -6,6 +6,7 @@ import ( "os" "service/app/mix/dao" "service/bizcommon/util" + "service/dbstruct" "service/library/configcenter" "service/library/logger" "time" @@ -25,6 +26,15 @@ func NewXxlJob(store *dao.Store, cfg *configcenter.XxlJobConfig) (x *XxlJob) { } } +func (x *XxlJob) UpdateXxlJobLog(ctx *gin.Context, xxlJobLog *dbstruct.XxlJobLog) (err error) { + err = x.store.UpdateXxlJobLog(ctx, nil, xxlJobLog) + if err != nil { + logger.Error("UpdateXxlJobLog fail, err: %v", err) + return + } + return +} + func (x *XxlJob) ClearSuccXxlJobLogs(ctx *gin.Context, jobIdsStr string, errorPrefix string) (err error) { // 查出当天所有已成功的日志 list, err := x.store.GetSuccessXxlJobLogs(ctx, nil, jobIdsStr, errorPrefix) diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index 42b0f1de..063a2cc7 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -139,75 +139,146 @@ func (s *CronService) CreateDailyStatement(ctx context.Context, param *xxl.RunRe // 图像审核作业 func (s *CronService) ImageAuditBatch(ctx context.Context, param *xxl.RunReq) (msg string) { logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) - // 刷新批次号 - batchId := imageaudit.RefreshBatchId() - // 执行图像审核 - successNum, failNum, err := imageaudit.Run(batchId) - // 同步图像审核结果 - err1 := DefaultService.utilSyncImageAuditTaskResultByBatchId(&gin.Context{}, batchId) - if err1 != nil { - return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed to syncronize, err :%v", batchId, err) - } - if err != nil { - return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) + handleMsg := &strings.Builder{} + xxlJobLog := &dbstruct.XxlJobLog{ + Id: goproto.Int64(param.LogID), } + go func() { + // 刷新批次号 + batchId := imageaudit.RefreshBatchId() + // 执行图像审核 + successNum, failNum, err := imageaudit.Run(batchId) + if err != nil { + handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err)) + } else { + handleMsg.WriteString(fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) + } + // 同步图像审核结果 + err = DefaultService.utilSyncImageAuditTaskResultByBatchId(&gin.Context{}, batchId) + if err != nil { + handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed to syncronize, err :%v", batchId, err)) + } + + if handleMsg.Cap() > 0 { + xxlJobLog.HandleMsg = goproto.String(handleMsg.String()) + err1 := _DefaultXxlJob.UpdateXxlJobLog(&gin.Context{}, xxlJobLog) + if err1 != nil { + logger.Error("UpdateXxlJobLog fail, err: %v", err1) + } + } + }() + logger.Info("Image audit batch ends...") - return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) + return handleMsg.String() } func (s *CronService) ImageAuditBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) { logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) - // 获取批次号 - batchId := param.ExecutorParams - // 执行图像审核 - successNum, failNum, err := imageaudit.Run(batchId) - // 同步图像审核结果 - err1 := DefaultService.utilSyncImageAuditTaskResultByBatchId(&gin.Context{}, batchId) - if err1 != nil { - return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed to syncronize, err :%v", batchId, err) - } - if err != nil { - return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) + handleMsg := &strings.Builder{} + xxlJobLog := &dbstruct.XxlJobLog{ + Id: goproto.Int64(param.LogID), } + + go func() { + // 刷新批次号 + batchId := imageaudit.RefreshBatchId() + // 执行图像审核 + successNum, failNum, err := imageaudit.Run(batchId) + if err != nil { + handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err)) + } else { + handleMsg.WriteString(fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) + } + // 同步图像审核结果 + err = DefaultService.utilSyncImageAuditTaskResultByBatchId(&gin.Context{}, batchId) + if err != nil { + handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed to syncronize, err :%v", batchId, err)) + } + + if handleMsg.Cap() > 0 { + xxlJobLog.HandleMsg = goproto.String(handleMsg.String()) + err1 := _DefaultXxlJob.UpdateXxlJobLog(&gin.Context{}, xxlJobLog) + if err1 != nil { + logger.Error("UpdateXxlJobLog fail, err: %v", err1) + } + } + }() + logger.Info("Image audit batch ends...") - return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) + return handleMsg.String() } func (s *CronService) TextAuditBatch(ctx context.Context, param *xxl.RunReq) (msg string) { logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) - // 刷新批次号 - batchId := textaudit.RefreshBatchId() - // 执行文字审核 - successNum, failNum, err := textaudit.Run(batchId) - // 同步文字审核结果 - err1 := DefaultService.utilSyncTextAuditTaskResultByBatchId(&gin.Context{}, batchId) - if err1 != nil { - return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed to syncronize, err :%v", batchId, err) - } - if err != nil { - return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) + handleMsg := &strings.Builder{} + xxlJobLog := &dbstruct.XxlJobLog{ + Id: goproto.Int64(param.LogID), } + + go func() { + // 刷新批次号 + batchId := textaudit.RefreshBatchId() + // 执行文字审核 + successNum, failNum, err := textaudit.Run(batchId) + if err != nil { + handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)) + } else { + handleMsg.WriteString(fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) + } + // 同步文字审核结果 + err = DefaultService.utilSyncTextAuditTaskResultByBatchId(&gin.Context{}, batchId) + if err != nil { + handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed to syncronize, err :%v", batchId, err)) + } + + if handleMsg.Cap() > 0 { + xxlJobLog.HandleMsg = goproto.String(handleMsg.String()) + err1 := _DefaultXxlJob.UpdateXxlJobLog(&gin.Context{}, xxlJobLog) + if err1 != nil { + logger.Error("UpdateXxlJobLog fail, err: %v", err1) + } + } + }() + logger.Info("Text audit batch ends...") - return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) + return handleMsg.String() } func (s *CronService) TextAuditBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) { logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) - // 获取批次号 - batchId := param.ExecutorParams - // 执行文字审核 - successNum, failNum, err := textaudit.Run(batchId) - // 同步文字审核结果 - err1 := DefaultService.utilSyncTextAuditTaskResultByBatchId(&gin.Context{}, batchId) - if err1 != nil { - return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed to syncronize, err :%v", batchId, err) - } - if err != nil { - return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err) + handleMsg := &strings.Builder{} + xxlJobLog := &dbstruct.XxlJobLog{ + Id: goproto.Int64(param.LogID), } + + go func() { + // 刷新批次号 + batchId := textaudit.RefreshBatchId() + // 执行文字审核 + successNum, failNum, err := textaudit.Run(batchId) + if err != nil { + handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)) + } + // 同步文字审核结果 + err = DefaultService.utilSyncTextAuditTaskResultByBatchId(&gin.Context{}, batchId) + if err != nil { + handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed to syncronize, err :%v", batchId, err)) + } else { + handleMsg.WriteString(fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) + } + + if handleMsg.Cap() > 0 { + xxlJobLog.HandleMsg = goproto.String(handleMsg.String()) + err1 := _DefaultXxlJob.UpdateXxlJobLog(&gin.Context{}, xxlJobLog) + if err1 != nil { + logger.Error("UpdateXxlJobLog fail, err: %v", err1) + } + } + }() + logger.Info("Text audit batch ends...") - return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum) + return handleMsg.String() } // 每天0点定时清空图像和文字审核作业的日志 diff --git a/dbstruct/xxljob_mysql.go b/dbstruct/xxljob_mysql.go index a4aceddd..5fb93c1a 100644 --- a/dbstruct/xxljob_mysql.go +++ b/dbstruct/xxljob_mysql.go @@ -17,3 +17,94 @@ type XxlJobLog struct { HandleMsg *string `json:"handle_msg" db:"handle_msg"` AlarmStatus *int32 `json:"alarm_status" db:"alarm_status"` } + +func (xxlJobLog *XxlJobLog) GetId() int64 { + if xxlJobLog != nil && xxlJobLog.Id != nil { + return *xxlJobLog.Id + } + return 0 +} +func (xxlJobLog *XxlJobLog) GetJobGroup() int32 { + if xxlJobLog != nil && xxlJobLog.JobGroup != nil { + return *xxlJobLog.JobGroup + } + return 0 +} +func (xxlJobLog *XxlJobLog) GetJobId() int32 { + if xxlJobLog != nil && xxlJobLog.JobId != nil { + return *xxlJobLog.JobId + } + return 0 +} +func (xxlJobLog *XxlJobLog) GetExecutorAddress() string { + if xxlJobLog != nil && xxlJobLog.ExecutorAddress != nil { + return *xxlJobLog.ExecutorAddress + } + return "" +} +func (xxlJobLog *XxlJobLog) GetExecutorHandler() string { + if xxlJobLog != nil && xxlJobLog.ExecutorHandler != nil { + return *xxlJobLog.ExecutorHandler + } + return "" +} +func (xxlJobLog *XxlJobLog) GetExecutorParam() string { + if xxlJobLog != nil && xxlJobLog.ExecutorParam != nil { + return *xxlJobLog.ExecutorParam + } + return "" +} +func (xxlJobLog *XxlJobLog) GetExecutorShardingParam() string { + if xxlJobLog != nil && xxlJobLog.ExecutorShardingParam != nil { + return *xxlJobLog.ExecutorShardingParam + } + return "" +} +func (xxlJobLog *XxlJobLog) GetExecutorFailRetryCount() int32 { + if xxlJobLog != nil && xxlJobLog.ExecutorFailRetryCount != nil { + return *xxlJobLog.ExecutorFailRetryCount + } + return 0 +} +func (xxlJobLog *XxlJobLog) GetTriggerTime() string { + if xxlJobLog != nil && xxlJobLog.TriggerTime != nil { + return *xxlJobLog.TriggerTime + } + return "" +} +func (xxlJobLog *XxlJobLog) GetTriggerCode() int32 { + if xxlJobLog != nil && xxlJobLog.TriggerCode != nil { + return *xxlJobLog.TriggerCode + } + return 0 +} +func (xxlJobLog *XxlJobLog) GetTriggerMsg() string { + if xxlJobLog != nil && xxlJobLog.TriggerMsg != nil { + return *xxlJobLog.TriggerMsg + } + return "" +} +func (xxlJobLog *XxlJobLog) GetHandleTime() string { + if xxlJobLog != nil && xxlJobLog.HandleTime != nil { + return *xxlJobLog.HandleTime + } + return "" +} +func (xxlJobLog *XxlJobLog) GetHandleCode() int32 { + if xxlJobLog != nil && xxlJobLog.HandleCode != nil { + return *xxlJobLog.HandleCode + } + return 0 +} +func (xxlJobLog *XxlJobLog) GetHandleMsg() string { + if xxlJobLog != nil && xxlJobLog.HandleMsg != nil { + return *xxlJobLog.HandleMsg + } + return "" +} +func (xxlJobLog *XxlJobLog) GetAlarmStatus() int32 { + if xxlJobLog != nil && xxlJobLog.AlarmStatus != nil { + return *xxlJobLog.AlarmStatus + } + return 0 +}