by Robin at 20240226;

This commit is contained in:
Leufolium 2024-02-26 08:03:37 +08:00
parent 3beeb4c7e6
commit 6fb4fe002e
6 changed files with 374 additions and 84 deletions

View File

@ -856,6 +856,99 @@ func (m *Mysql) GetSuccessXxlJobLogs(ctx *gin.Context, tx *sqlx.Tx, jobIdsStr st
return
}
// 更新指定任务的任务信息
func (m *Mysql) UpdateXxlJobLog(ctx *gin.Context, tx *sqlx.Tx, xxlJobLog *dbstruct.XxlJobLog) (err error) {
setClauses := make([]string, 0)
setValue := make([]any, 0)
if xxlJobLog.JobGroup != nil {
setClauses = append(setClauses, " job_group = ? ")
setValue = append(setValue, xxlJobLog.GetJobGroup())
}
if xxlJobLog.JobId != nil {
setClauses = append(setClauses, " job_id = ? ")
setValue = append(setValue, xxlJobLog.GetJobId())
}
if xxlJobLog.ExecutorAddress != nil {
setClauses = append(setClauses, " executor_address = ? ")
setValue = append(setValue, xxlJobLog.GetExecutorAddress())
}
if xxlJobLog.ExecutorHandler != nil {
setClauses = append(setClauses, " executor_handler = ? ")
setValue = append(setValue, xxlJobLog.GetExecutorHandler())
}
if xxlJobLog.ExecutorParam != nil {
setClauses = append(setClauses, " executor_param = ? ")
setValue = append(setValue, xxlJobLog.GetExecutorParam())
}
if xxlJobLog.ExecutorShardingParam != nil {
setClauses = append(setClauses, " executor_sharding_param = ? ")
setValue = append(setValue, xxlJobLog.GetExecutorShardingParam())
}
if xxlJobLog.ExecutorFailRetryCount != nil {
setClauses = append(setClauses, " executor_fail_retry_count = ? ")
setValue = append(setValue, xxlJobLog.GetExecutorFailRetryCount())
}
if xxlJobLog.TriggerTime != nil {
setClauses = append(setClauses, " trigger_time = ? ")
setValue = append(setValue, xxlJobLog.GetTriggerTime())
}
if xxlJobLog.TriggerCode != nil {
setClauses = append(setClauses, " trigger_code = ? ")
setValue = append(setValue, xxlJobLog.GetTriggerCode())
}
if xxlJobLog.TriggerMsg != nil {
setClauses = append(setClauses, " trigger_msg = ? ")
setValue = append(setValue, xxlJobLog.GetTriggerMsg())
}
if xxlJobLog.HandleTime != nil {
setClauses = append(setClauses, " handle_time = ? ")
setValue = append(setValue, xxlJobLog.GetHandleTime())
}
if xxlJobLog.HandleCode != nil {
setClauses = append(setClauses, " handle_code = ? ")
setValue = append(setValue, xxlJobLog.GetHandleCode())
}
if xxlJobLog.HandleMsg != nil {
setClauses = append(setClauses, " handle_msg = ? ")
setValue = append(setValue, xxlJobLog.GetHandleMsg())
}
if xxlJobLog.AlarmStatus != nil {
setClauses = append(setClauses, " alarm_status = ? ")
setValue = append(setValue, xxlJobLog.GetAlarmStatus())
}
if len(setValue) == 0 {
return
}
setValue = append(setValue, xxlJobLog.GetId())
setClause := &strings.Builder{}
for i, clause := range setClauses {
if i > 0 {
setClause.WriteString(",")
}
setClause.WriteString(clause)
}
sqlStr := fmt.Sprintf("update %s set %s where id = ?", TableXxlJobLog, setClause)
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr, setValue...)
} else {
db := m.getDBXxlJob()
_, err = db.ExecContext(ctx, sqlStr, setValue...)
}
if err == sql.ErrNoRows {
err = nil
return
}
if err != nil {
return
}
return
}
// 删除指定任务中所有已经执行成功的xxl_job任务
func (m *Mysql) DeleteSuccessXxlJobLogs(ctx *gin.Context, tx *sqlx.Tx, ids []int64) (err error) {
if len(ids) == 0 {

View File

@ -52,7 +52,7 @@ func (s *CronService) Init(c any) (err error) {
exec.RegTask("image_audit_batch_his", s.ImageAuditBatchHis)
exec.RegTask("text_audit_batch", s.TextAuditBatch)
exec.RegTask("text_audit_batch_his", s.TextAuditBatchHis)
exec.RegTask("clear_content_audit_batch_execution_logs", s.ClearContentAuditBatchExecutionLogs)
//exec.RegTask("clear_content_audit_batch_execution_logs", s.ClearContentAuditBatchExecutionLogs)
exec.LogHandler(customLogHandle)
//注册任务handler

View File

@ -10,6 +10,7 @@ import (
"service/dbstruct"
"github.com/gin-gonic/gin"
goproto "google.golang.org/protobuf/proto"
)
var DefaultImageAuditTaskResultHandler *ImageAuditTaskResultHandler
@ -115,47 +116,71 @@ func (handler *ImageAuditTaskResultHandler) generateStreamerAlbumUpdateFunc() {
}
}
// 动态表->图像内容,若不通过,则删掉审核不通过的图片
// 动态表->图像内容,若不通过,则删掉审核不通过的图片,图片仅剩一张则删除动态,若通过,则若动态已被删除,恢复该动态至之前状态
func (handler *ImageAuditTaskResultHandler) generateMomentMediaComponentUpdateFunc() {
handler.imageAuditTaskUpdateFuncGeneratorMap["moment|moment|media_component"] = func(ctx *gin.Context, task *dbstruct.ImageAuditTask, option int) func() error {
return func() error {
momentId := task.AssociativeTableId
var mediaComp *dbstruct.MediaComponent
if option == consts.ImageAuditTaskUpdate_Pass {
mediaComp = task.AuditedMedia
return passMomentImageAuditTask(ctx, task)
} else {
auditedImageIds := util.DerefInt64Slice(task.AuditedMedia.ImageIds)
imageIds := make([]int64, 0)
for i, pass := range task.AuditedMediaResults {
if pass {
imageIds = append(imageIds, auditedImageIds[i])
}
}
if len(imageIds) != 0 {
mediaComp = &dbstruct.MediaComponent{
ImageIds: &imageIds,
}
}
}
// 如果旧媒体内容为空,则删除这条动态
if mediaComp == nil {
return _DefaultMoment.OpDelete(ctx, util.DerefInt64(momentId))
} else {
if err := _DefaultMoment.OpUpdate(ctx, &momentproto.OpUpdateReq{
Moment: &dbstruct.Moment{
Id: momentId,
MediaComp: mediaComp,
},
}); err != nil {
return err
}
if err := _DefaultMomentAuditTask.OpUpdateByImageAuditTaskIds(ctx, &dbstruct.MomentAuditTask{
FinalMedia: mediaComp,
}, []string{util.DerefString(task.Id)}); err != nil {
return err
}
return nil
return rollbackMomentImageAuditTask(ctx, task)
}
}
}
}
// 通过动态表图像
func passMomentImageAuditTask(ctx *gin.Context, task *dbstruct.ImageAuditTask) (err error) {
if err := _DefaultMoment.OpUpdate(ctx, &momentproto.OpUpdateReq{
Moment: &dbstruct.Moment{
Id: task.AssociativeTableId,
MediaComp: task.AuditedMedia,
DelFlag: goproto.Int64(consts.Exist),
},
}); err != nil {
return err
}
if err := _DefaultMomentAuditTask.OpUpdateByImageAuditTaskIds(ctx, &dbstruct.MomentAuditTask{
FinalMedia: task.AuditedMedia,
}, []string{util.DerefString(task.Id)}); err != nil {
return err
}
return nil
}
// 回退动态表图像
func rollbackMomentImageAuditTask(ctx *gin.Context, task *dbstruct.ImageAuditTask) (err error) {
auditedImageIds := util.DerefInt64Slice(task.AuditedMedia.ImageIds)
imageIds := make([]int64, 0)
for i, pass := range task.AuditedMediaResults {
if pass {
imageIds = append(imageIds, auditedImageIds[i])
}
}
if len(imageIds) != 0 {
mediaComp := &dbstruct.MediaComponent{
ImageIds: &imageIds,
}
if err := _DefaultMoment.OpUpdate(ctx, &momentproto.OpUpdateReq{
Moment: &dbstruct.Moment{
Id: task.AssociativeTableId,
MediaComp: mediaComp,
},
}); err != nil {
return err
}
if err := _DefaultMomentAuditTask.OpUpdateByImageAuditTaskIds(ctx, &dbstruct.MomentAuditTask{
FinalMedia: mediaComp,
}, []string{util.DerefString(task.Id)}); err != nil {
return err
}
return nil
} else {
return _DefaultMoment.OpDelete(ctx, util.DerefInt64(task.AssociativeTableId))
}
}

View File

@ -6,6 +6,7 @@ import (
"os"
"service/app/mix/dao"
"service/bizcommon/util"
"service/dbstruct"
"service/library/configcenter"
"service/library/logger"
"time"
@ -25,6 +26,15 @@ func NewXxlJob(store *dao.Store, cfg *configcenter.XxlJobConfig) (x *XxlJob) {
}
}
func (x *XxlJob) UpdateXxlJobLog(ctx *gin.Context, xxlJobLog *dbstruct.XxlJobLog) (err error) {
err = x.store.UpdateXxlJobLog(ctx, nil, xxlJobLog)
if err != nil {
logger.Error("UpdateXxlJobLog fail, err: %v", err)
return
}
return
}
func (x *XxlJob) ClearSuccXxlJobLogs(ctx *gin.Context, jobIdsStr string, errorPrefix string) (err error) {
// 查出当天所有已成功的日志
list, err := x.store.GetSuccessXxlJobLogs(ctx, nil, jobIdsStr, errorPrefix)

View File

@ -139,75 +139,146 @@ func (s *CronService) CreateDailyStatement(ctx context.Context, param *xxl.RunRe
// 图像审核作业
func (s *CronService) ImageAuditBatch(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
// 刷新批次号
batchId := imageaudit.RefreshBatchId()
// 执行图像审核
successNum, failNum, err := imageaudit.Run(batchId)
// 同步图像审核结果
err1 := DefaultService.utilSyncImageAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err1 != nil {
return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed to syncronize, err :%v", batchId, err)
}
if err != nil {
return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)
handleMsg := &strings.Builder{}
xxlJobLog := &dbstruct.XxlJobLog{
Id: goproto.Int64(param.LogID),
}
go func() {
// 刷新批次号
batchId := imageaudit.RefreshBatchId()
// 执行图像审核
successNum, failNum, err := imageaudit.Run(batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err))
} else {
handleMsg.WriteString(fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
}
// 同步图像审核结果
err = DefaultService.utilSyncImageAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed to syncronize, err :%v", batchId, err))
}
if handleMsg.Cap() > 0 {
xxlJobLog.HandleMsg = goproto.String(handleMsg.String())
err1 := _DefaultXxlJob.UpdateXxlJobLog(&gin.Context{}, xxlJobLog)
if err1 != nil {
logger.Error("UpdateXxlJobLog fail, err: %v", err1)
}
}
}()
logger.Info("Image audit batch ends...")
return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
return handleMsg.String()
}
func (s *CronService) ImageAuditBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
// 获取批次号
batchId := param.ExecutorParams
// 执行图像审核
successNum, failNum, err := imageaudit.Run(batchId)
// 同步图像审核结果
err1 := DefaultService.utilSyncImageAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err1 != nil {
return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed to syncronize, err :%v", batchId, err)
}
if err != nil {
return fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)
handleMsg := &strings.Builder{}
xxlJobLog := &dbstruct.XxlJobLog{
Id: goproto.Int64(param.LogID),
}
go func() {
// 刷新批次号
batchId := imageaudit.RefreshBatchId()
// 执行图像审核
successNum, failNum, err := imageaudit.Run(batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err))
} else {
handleMsg.WriteString(fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
}
// 同步图像审核结果
err = DefaultService.utilSyncImageAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed to syncronize, err :%v", batchId, err))
}
if handleMsg.Cap() > 0 {
xxlJobLog.HandleMsg = goproto.String(handleMsg.String())
err1 := _DefaultXxlJob.UpdateXxlJobLog(&gin.Context{}, xxlJobLog)
if err1 != nil {
logger.Error("UpdateXxlJobLog fail, err: %v", err1)
}
}
}()
logger.Info("Image audit batch ends...")
return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
return handleMsg.String()
}
func (s *CronService) TextAuditBatch(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
// 刷新批次号
batchId := textaudit.RefreshBatchId()
// 执行文字审核
successNum, failNum, err := textaudit.Run(batchId)
// 同步文字审核结果
err1 := DefaultService.utilSyncTextAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err1 != nil {
return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed to syncronize, err :%v", batchId, err)
}
if err != nil {
return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)
handleMsg := &strings.Builder{}
xxlJobLog := &dbstruct.XxlJobLog{
Id: goproto.Int64(param.LogID),
}
go func() {
// 刷新批次号
batchId := textaudit.RefreshBatchId()
// 执行文字审核
successNum, failNum, err := textaudit.Run(batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err))
} else {
handleMsg.WriteString(fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
}
// 同步文字审核结果
err = DefaultService.utilSyncTextAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed to syncronize, err :%v", batchId, err))
}
if handleMsg.Cap() > 0 {
xxlJobLog.HandleMsg = goproto.String(handleMsg.String())
err1 := _DefaultXxlJob.UpdateXxlJobLog(&gin.Context{}, xxlJobLog)
if err1 != nil {
logger.Error("UpdateXxlJobLog fail, err: %v", err1)
}
}
}()
logger.Info("Text audit batch ends...")
return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
return handleMsg.String()
}
func (s *CronService) TextAuditBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
// 获取批次号
batchId := param.ExecutorParams
// 执行文字审核
successNum, failNum, err := textaudit.Run(batchId)
// 同步文字审核结果
err1 := DefaultService.utilSyncTextAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err1 != nil {
return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed to syncronize, err :%v", batchId, err)
}
if err != nil {
return fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)
handleMsg := &strings.Builder{}
xxlJobLog := &dbstruct.XxlJobLog{
Id: goproto.Int64(param.LogID),
}
go func() {
// 刷新批次号
batchId := textaudit.RefreshBatchId()
// 执行文字审核
successNum, failNum, err := textaudit.Run(batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err))
}
// 同步文字审核结果
err = DefaultService.utilSyncTextAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed to syncronize, err :%v", batchId, err))
} else {
handleMsg.WriteString(fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
}
if handleMsg.Cap() > 0 {
xxlJobLog.HandleMsg = goproto.String(handleMsg.String())
err1 := _DefaultXxlJob.UpdateXxlJobLog(&gin.Context{}, xxlJobLog)
if err1 != nil {
logger.Error("UpdateXxlJobLog fail, err: %v", err1)
}
}
}()
logger.Info("Text audit batch ends...")
return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
return handleMsg.String()
}
// 每天0点定时清空图像和文字审核作业的日志

View File

@ -17,3 +17,94 @@ type XxlJobLog struct {
HandleMsg *string `json:"handle_msg" db:"handle_msg"`
AlarmStatus *int32 `json:"alarm_status" db:"alarm_status"`
}
func (xxlJobLog *XxlJobLog) GetId() int64 {
if xxlJobLog != nil && xxlJobLog.Id != nil {
return *xxlJobLog.Id
}
return 0
}
func (xxlJobLog *XxlJobLog) GetJobGroup() int32 {
if xxlJobLog != nil && xxlJobLog.JobGroup != nil {
return *xxlJobLog.JobGroup
}
return 0
}
func (xxlJobLog *XxlJobLog) GetJobId() int32 {
if xxlJobLog != nil && xxlJobLog.JobId != nil {
return *xxlJobLog.JobId
}
return 0
}
func (xxlJobLog *XxlJobLog) GetExecutorAddress() string {
if xxlJobLog != nil && xxlJobLog.ExecutorAddress != nil {
return *xxlJobLog.ExecutorAddress
}
return ""
}
func (xxlJobLog *XxlJobLog) GetExecutorHandler() string {
if xxlJobLog != nil && xxlJobLog.ExecutorHandler != nil {
return *xxlJobLog.ExecutorHandler
}
return ""
}
func (xxlJobLog *XxlJobLog) GetExecutorParam() string {
if xxlJobLog != nil && xxlJobLog.ExecutorParam != nil {
return *xxlJobLog.ExecutorParam
}
return ""
}
func (xxlJobLog *XxlJobLog) GetExecutorShardingParam() string {
if xxlJobLog != nil && xxlJobLog.ExecutorShardingParam != nil {
return *xxlJobLog.ExecutorShardingParam
}
return ""
}
func (xxlJobLog *XxlJobLog) GetExecutorFailRetryCount() int32 {
if xxlJobLog != nil && xxlJobLog.ExecutorFailRetryCount != nil {
return *xxlJobLog.ExecutorFailRetryCount
}
return 0
}
func (xxlJobLog *XxlJobLog) GetTriggerTime() string {
if xxlJobLog != nil && xxlJobLog.TriggerTime != nil {
return *xxlJobLog.TriggerTime
}
return ""
}
func (xxlJobLog *XxlJobLog) GetTriggerCode() int32 {
if xxlJobLog != nil && xxlJobLog.TriggerCode != nil {
return *xxlJobLog.TriggerCode
}
return 0
}
func (xxlJobLog *XxlJobLog) GetTriggerMsg() string {
if xxlJobLog != nil && xxlJobLog.TriggerMsg != nil {
return *xxlJobLog.TriggerMsg
}
return ""
}
func (xxlJobLog *XxlJobLog) GetHandleTime() string {
if xxlJobLog != nil && xxlJobLog.HandleTime != nil {
return *xxlJobLog.HandleTime
}
return ""
}
func (xxlJobLog *XxlJobLog) GetHandleCode() int32 {
if xxlJobLog != nil && xxlJobLog.HandleCode != nil {
return *xxlJobLog.HandleCode
}
return 0
}
func (xxlJobLog *XxlJobLog) GetHandleMsg() string {
if xxlJobLog != nil && xxlJobLog.HandleMsg != nil {
return *xxlJobLog.HandleMsg
}
return ""
}
func (xxlJobLog *XxlJobLog) GetAlarmStatus() int32 {
if xxlJobLog != nil && xxlJobLog.AlarmStatus != nil {
return *xxlJobLog.AlarmStatus
}
return 0
}