From 057badc5c1741d8c2bf5212cc6289c7d2b48d64b Mon Sep 17 00:00:00 2001 From: Robin <7434053+warrior_of_light_robin@user.noreply.gitee.com> Date: Tue, 24 Dec 2024 14:46:49 +0800 Subject: [PATCH] by Robin at 20241224 --- app/mix/cmd_offline/main.go | 1 + app/mix/controller/init_offline.go | 2 +- .../controller/video_moderation_callback.go | 2 +- app/mix/service/cronservice.go | 6 +++ .../service/imageaudittask_result_handler.go | 12 ++++- app/mix/service/notif_builder_handler.go | 16 ++++++ .../service/textaudittask_result_handler.go | 5 ++ app/mix/service/xxljob_tasks.go | 49 ++++++++++++++---- library/contentaudit/imageaudit/client.go | 2 +- .../contentaudit/imageaudit/control_block.go | 13 +++-- library/contentaudit/imageaudit/imageaudit.go | 50 ++++++++----------- library/contentaudit/textaudit/client.go | 2 +- .../contentaudit/textaudit/control_block.go | 11 ++-- library/contentaudit/textaudit/textaudit.go | 43 +++++++--------- .../contentaudit/video_moderation/client.go | 2 +- .../video_moderation/control_block.go | 3 +- .../contentaudit/video_moderation/handler.go | 36 ++++++------- .../video_moderation/video_moderation.go | 24 ++++----- 18 files changed, 164 insertions(+), 115 deletions(-) diff --git a/app/mix/cmd_offline/main.go b/app/mix/cmd_offline/main.go index 8851f1ef..53cd6c67 100644 --- a/app/mix/cmd_offline/main.go +++ b/app/mix/cmd_offline/main.go @@ -117,6 +117,7 @@ func main() { router := httpengine.NewRouter() middleware.InitJwtAuthenticator(service.DefaultService.OpVerifyToken) service.DefaultService.ConnectToNotifCenter(middleware.InitNotifSender) + service.DefaultCronService.ConnectToNotifSender(middleware.DefaultNotifSender) validator.InitDefaultNotNullValidator() controller.InitOffline(router) srv := &http.Server{ diff --git a/app/mix/controller/init_offline.go b/app/mix/controller/init_offline.go index 119c1b01..db8933a2 100644 --- a/app/mix/controller/init_offline.go +++ b/app/mix/controller/init_offline.go @@ -16,5 +16,5 @@ func InitOffline(r *gin.Engine) { // 视频审核callback extVideoModerationGroup := r.Group("/offline_ext/video_moderation") - extVideoModerationGroup.POST("callback", middleware.FORMParamValidator(video_moderation_proto.ExtVideoModerationReq{}), VideoModerationCallback) + extVideoModerationGroup.POST("callback", middleware.FORMParamValidator(video_moderation_proto.ExtVideoModerationReq{}), VideoModerationCallback, middleware.NotifSender()) } diff --git a/app/mix/controller/video_moderation_callback.go b/app/mix/controller/video_moderation_callback.go index d11c32a7..a6648779 100644 --- a/app/mix/controller/video_moderation_callback.go +++ b/app/mix/controller/video_moderation_callback.go @@ -18,7 +18,7 @@ func VideoModerationCallback(ctx *gin.Context) { return } - err = videomoderation.HandleVideoModerationContent(req.Content) + err = videomoderation.HandleVideoModerationContent(ctx, req.Content) if err != nil { logger.Error("HandleVideoModerationContent fail, req: %v, err: %v", util.ToJson(req), err) return diff --git a/app/mix/service/cronservice.go b/app/mix/service/cronservice.go index d3a3b023..9f215ba3 100644 --- a/app/mix/service/cronservice.go +++ b/app/mix/service/cronservice.go @@ -7,6 +7,7 @@ import ( "service/library/logger" "strings" + "github.com/gin-gonic/gin" xxl "github.com/xxl-job/xxl-job-executor-go" ) @@ -26,6 +27,7 @@ type ServerConnInfo struct { type CronService struct { fileAbsPath string serverConnInfos []*ServerConnInfo + notifSender gin.HandlerFunc } func NewCronService() *CronService { @@ -94,6 +96,10 @@ func (s *CronService) Init(c any) (exec xxl.Executor, err error) { return } +func (s *CronService) ConnectToNotifSender(notifSender gin.HandlerFunc) { + s.notifSender = notifSender +} + // 自定义日志处理器 func customLogHandle(req *xxl.LogReq) *xxl.LogRes { return &xxl.LogRes{Code: xxl.SuccessCode, Msg: "", Content: xxl.LogResContent{ diff --git a/app/mix/service/imageaudittask_result_handler.go b/app/mix/service/imageaudittask_result_handler.go index 790b96d4..88a5d9a1 100644 --- a/app/mix/service/imageaudittask_result_handler.go +++ b/app/mix/service/imageaudittask_result_handler.go @@ -75,12 +75,22 @@ func (handler *ImageAuditTaskResultHandler) generateAccountAvatarUpdateFunc() { } else { avatar = task.OldMedia } - return _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{ + + err := _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{ Account: &dbstruct.Account{ Mid: mid, Avatar: avatar, }, }) + if err != nil { + return err + } + + // 机审回退增加自动消息发送 + if option == consts.ImageAuditTaskUpdate_Rollback { + DefaultNotifBuilderHandler.Handle(ctx)(consts.AudNotifTemp_AvatarRollbacked)(task.GetAssociativeTableId()) + } + return nil } } } diff --git a/app/mix/service/notif_builder_handler.go b/app/mix/service/notif_builder_handler.go index 32e93c62..79b1bbd4 100644 --- a/app/mix/service/notif_builder_handler.go +++ b/app/mix/service/notif_builder_handler.go @@ -62,7 +62,9 @@ func (handler *NotifBuilderHandler) init() { handler.handleSysMembershipPurchased() // 注册审核通知处理 handler.handleAudAvatarChangeApplied() + handler.handleAudAvatarRollbacked() handler.handleAudNameChangeApplied() + handler.handleAudNameRollbacked() handler.handleAudStreamerBasicInfoApplied() handler.handleAudStreamerBasicInfoPassed() handler.handleAudStreamerBasicInfoRejected() @@ -288,6 +290,13 @@ func (handler *NotifBuilderHandler) handleAudAvatarChangeApplied() { } } +func (handler *NotifBuilderHandler) handleAudAvatarRollbacked() { + handler.handlerMap[consts.AudNotifTemp_AvatarRollbacked] = func(ctx *gin.Context, args ...any) { + mid := args[0].(int64) + DefaultService.utilWriteNotifInfo(ctx, consts.AudNotifTemp_AvatarRollbacked, mid) + } +} + func (handler *NotifBuilderHandler) handleAudNameChangeApplied() { handler.handlerMap[consts.AudNotifTemp_NameChangeApplied] = func(ctx *gin.Context, args ...any) { account := args[0].(*dbstruct.Account) @@ -297,6 +306,13 @@ func (handler *NotifBuilderHandler) handleAudNameChangeApplied() { } } +func (handler *NotifBuilderHandler) handleAudNameRollbacked() { + handler.handlerMap[consts.AudNotifTemp_NameRollbacked] = func(ctx *gin.Context, args ...any) { + mid := args[0].(int64) + DefaultService.utilWriteNotifInfo(ctx, consts.AudNotifTemp_NameRollbacked, mid) + } +} + func (handler *NotifBuilderHandler) handleAudStreamerBasicInfoApplied() { handler.handlerMap[consts.AudNotifTemp_StreamerBasicInfoApplied] = func(ctx *gin.Context, args ...any) { streamerBasic := args[0].(*dbstruct.StreamerAuthApprovalBasic) diff --git a/app/mix/service/textaudittask_result_handler.go b/app/mix/service/textaudittask_result_handler.go index b34bc7dc..b7b69ea3 100644 --- a/app/mix/service/textaudittask_result_handler.go +++ b/app/mix/service/textaudittask_result_handler.go @@ -105,6 +105,11 @@ func (handler *TextAuditTaskResultHandler) generateAccountNameUpdateFunc() { }) } + // 机审回退增加自动消息发送;仅针对用户 + if option == consts.TextAuditTaskUpdate_Rollback && acct.GetRole() == consts.User { + DefaultNotifBuilderHandler.Handle(ctx)(consts.AudNotifTemp_NameRollbacked)(task.GetAssociativeTableId()) + } + return nil } } diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index 8e1b2f43..bf992ce0 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -198,19 +198,24 @@ func (s *CronService) ImageAuditBatch(ctx context.Context, param *xxl.RunReq) (m logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) handleMsg := &strings.Builder{} + ctxt := &gin.Context{} + // 刷新批次号 - batchId, err := imageaudit.RefreshBatchId() + batchId, err := imageaudit.RefreshBatchId(ctxt) if err != nil { return fmt.Sprintf("Refresh batchId failed, err: %v", err) } // 执行图像审核 - successNum, failNum, err := imageaudit.Run(batchId) + successNum, failNum, err := imageaudit.Run(ctxt, batchId) if err != nil { handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err)) } else { handleMsg.WriteString(fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) } + // 发送通知 + s.notifSender(ctxt) + logger.Info("Image audit batch ends...") return handleMsg.String() } @@ -218,16 +223,22 @@ func (s *CronService) ImageAuditBatch(ctx context.Context, param *xxl.RunReq) (m func (s *CronService) ImageAuditBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) { logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) handleMsg := &strings.Builder{} + + ctxt := &gin.Context{} + // 刷新批次号 batchId := param.ExecutorParams // 执行图像审核 - successNum, failNum, err := imageaudit.Run(batchId) + successNum, failNum, err := imageaudit.Run(ctxt, batchId) if err != nil { handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err)) } else { handleMsg.WriteString(fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) } + // 发送通知 + s.notifSender(ctxt) + logger.Info("Image audit batch ends...") return handleMsg.String() } @@ -236,20 +247,24 @@ func (s *CronService) TextAuditBatch(ctx context.Context, param *xxl.RunReq) (ms logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) handleMsg := &strings.Builder{} + ctxt := &gin.Context{} + // 刷新批次号 - batchId, err := textaudit.RefreshBatchId() + batchId, err := textaudit.RefreshBatchId(ctxt) if err != nil { return fmt.Sprintf("Refresh batchId failed, err: %v", err) } - // 执 // 执行文字审核 - successNum, failNum, err := textaudit.Run(batchId) + successNum, failNum, err := textaudit.Run(ctxt, batchId) if err != nil { handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)) } else { handleMsg.WriteString(fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) } + // 发送通知 + s.notifSender(ctxt) + logger.Info("Text audit batch ends...") return handleMsg.String() } @@ -258,16 +273,21 @@ func (s *CronService) TextAuditBatchHis(ctx context.Context, param *xxl.RunReq) logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) handleMsg := &strings.Builder{} + ctxt := &gin.Context{} + // 刷新批次号 batchId := param.ExecutorParams // 执行文字审核 - successNum, failNum, err := textaudit.Run(batchId) + successNum, failNum, err := textaudit.Run(ctxt, batchId) if err != nil { handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)) } else { handleMsg.WriteString(fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) } + // 发送通知 + s.notifSender(ctxt) + logger.Info("Text audit batch ends...") return handleMsg.String() } @@ -524,19 +544,23 @@ func (s *CronService) VideoModerationBatch(ctx context.Context, param *xxl.RunRe logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) handleMsg := &strings.Builder{} + ctxt := &gin.Context{} + // 刷新批次号 - batchId, err := videomoderation.RefreshBatchId() + batchId, err := videomoderation.RefreshBatchId(ctxt) if err != nil { return fmt.Sprintf("Refresh batchId failed, err: %v", err) } // 执行视频审核 - successNum, failNum, err := videomoderation.Run(batchId) + successNum, failNum, err := videomoderation.Run(ctxt, batchId) if err != nil { handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, video moderation tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err)) } else { handleMsg.WriteString(fmt.Sprintf("batchId : %v, video moderation tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) } + // 视频审核作业有回调函数,在回调函数后触发消息发送 + logger.Info("Video moderation batch ends...") return handleMsg.String() } @@ -544,16 +568,21 @@ func (s *CronService) VideoModerationBatch(ctx context.Context, param *xxl.RunRe func (s *CronService) VideoModerationBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) { logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) handleMsg := &strings.Builder{} + + ctxt := &gin.Context{} + // 刷新批次号 batchId := param.ExecutorParams // 执行视频审核 - successNum, failNum, err := videomoderation.Run(batchId) + successNum, failNum, err := videomoderation.Run(ctxt, batchId) if err != nil { handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, video moderation tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err)) } else { handleMsg.WriteString(fmt.Sprintf("batchId : %v, video moderation tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) } + // 视频审核作业有回调函数,在回调函数后触发消息发送 + logger.Info("Video moderation batch ends...") return handleMsg.String() } diff --git a/library/contentaudit/imageaudit/client.go b/library/contentaudit/imageaudit/client.go index 4afcd34d..32aa638c 100644 --- a/library/contentaudit/imageaudit/client.go +++ b/library/contentaudit/imageaudit/client.go @@ -77,7 +77,7 @@ func Init(cfg *configcenter.ImageAuditConfig) (err error) { } // batch_id - _, err = RefreshBatchId() + _, err = RefreshBatchId(&gin.Context{}) if err != nil { logger.Error("RefreshBatchId fail, cfg: %v, err: %v", util.ToJson(config), err) return diff --git a/library/contentaudit/imageaudit/control_block.go b/library/contentaudit/imageaudit/control_block.go index 2ec3344d..df9b4f87 100644 --- a/library/contentaudit/imageaudit/control_block.go +++ b/library/contentaudit/imageaudit/control_block.go @@ -21,7 +21,7 @@ type ImageAuditTaskBatchControlBlock struct { ActionMap map[string]*ImageAuditAction // 动作Id号-action的map } -func NewImageAuditTaskBatchControlBlock(tasks []*dbstruct.ImageAuditTask, batchId string) *ImageAuditTaskBatchControlBlock { +func NewImageAuditTaskBatchControlBlock(ctx *gin.Context, tasks []*dbstruct.ImageAuditTask, batchId string) *ImageAuditTaskBatchControlBlock { if len(tasks) == 0 { return nil } @@ -40,17 +40,16 @@ func NewImageAuditTaskBatchControlBlock(tasks []*dbstruct.ImageAuditTask, batchI for _, task := range tasks { taskCtrlBlock := NewImageAuditTaskControlBlock(task) ctrlBlock.TaskCtrlBlocks = append(ctrlBlock.TaskCtrlBlocks, taskCtrlBlock) - ctrlBlock.RecordAction(taskCtrlBlock) + ctrlBlock.RecordAction(ctx, taskCtrlBlock) imageIndex, taskIndex = ctrlBlock.RecordImage(taskCtrlBlock, &mediaFillables, imageIndex, taskIndex) } - mediafiller.FillListInternal(&gin.Context{}, mediaFillables) + mediafiller.FillListInternal(ctx, mediaFillables) return ctrlBlock } -func (ctrlBlock *ImageAuditTaskBatchControlBlock) RecordAction(taskCtrlBlock *ImageAuditTaskControlBlock) { - ctx := &gin.Context{} +func (ctrlBlock *ImageAuditTaskBatchControlBlock) RecordAction(ctx *gin.Context, taskCtrlBlock *ImageAuditTaskControlBlock) { // 写map记录 if ctrlBlock.ActionMap[taskCtrlBlock.ActionId] == nil { // 写入ctrlBlock.ActionMap ctrlBlock.ActionMap[taskCtrlBlock.ActionId] = NewImageAuditAction() @@ -107,8 +106,8 @@ type ImageAuditTaskControlBlock struct { // 新建图像审核任务块 func NewImageAuditTaskControlBlock(task *dbstruct.ImageAuditTask) (tcb *ImageAuditTaskControlBlock) { tcb = &ImageAuditTaskControlBlock{ - ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName), - util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)), + ActionId: fmt.Sprintf("%v%v%v%v", task.GetAssociativeDatabase(), task.GetAssociativeTableName(), + task.GetAssociativeTableId(), task.GetAssociativeTableColumn()), ImageAuditTask: task, IsTaskPassed: true, AuditedFragmentsNum: 0, diff --git a/library/contentaudit/imageaudit/imageaudit.go b/library/contentaudit/imageaudit/imageaudit.go index b247281c..61ab33df 100644 --- a/library/contentaudit/imageaudit/imageaudit.go +++ b/library/contentaudit/imageaudit/imageaudit.go @@ -19,8 +19,8 @@ import ( ) // 刷新批次号 -func RefreshBatchId() (string, error) { - batchId, err := _DefaultContentAuditRTI.GetAndUpdateImageAuditBatchId(&gin.Context{}, genereteBatchId()) +func RefreshBatchId(ctx *gin.Context) (string, error) { + batchId, err := _DefaultContentAuditRTI.GetAndUpdateImageAuditBatchId(ctx, genereteBatchId()) if err != nil { logger.Info("_DefaultContentAuditRTI GetAndUpdateImageAuditBatchId fail: %v", err) return "", err @@ -29,10 +29,10 @@ func RefreshBatchId() (string, error) { } // 图像审核主逻辑 -func Run(batchId string) (successNum int, failNum int, err error) { +func Run(ctx *gin.Context, batchId string) (successNum int, failNum int, err error) { // 查询该批次所有审核任务 - imageaudittasks, err := _DefaultImageAuditTask.OpList(&gin.Context{}, &imageaudittaskproto.OpListReq{ + imageaudittasks, err := _DefaultImageAuditTask.OpList(ctx, &imageaudittaskproto.OpListReq{ BatchId: goproto.String(batchId), Status: goproto.Int64(consts.ImageAudit_Created), Sort: "ct", @@ -47,14 +47,14 @@ func Run(batchId string) (successNum int, failNum int, err error) { logger.Info("Image audit batch started, batchId : %v, task number : %v", batchId, len(imageaudittasks)) // 1.构建批量任务控制块,初始化必要的actionMap、填充送审image等信息 - ctrlBlock := NewImageAuditTaskBatchControlBlock(imageaudittasks, batchId) + ctrlBlock := NewImageAuditTaskBatchControlBlock(ctx, imageaudittasks, batchId) // 2.创建请求 // oss不在上海的服务器,需要调用Advance接口 reqs, err := createScanImageAdvanceRequest(ctrlBlock) if err != nil { logger.Info("Create Scan ImageRequest fail: %v", err) - handleBatchError(ctrlBlock, err) + handleBatchError(ctx, ctrlBlock, err) failNum = len(imageaudittasks) return } @@ -72,7 +72,7 @@ func Run(batchId string) (successNum int, failNum int, err error) { if _t, ok := err.(*tea.SDKError); ok { logger.Error("ScanImageAdvance fail, errinfo : %v", util.DerefString(_t.Data)) } - handleBatchError(ctrlBlock, err) + handleBatchError(ctx, ctrlBlock, err) failNum = len(imageaudittasks) return } @@ -81,7 +81,7 @@ func Run(batchId string) (successNum int, failNum int, err error) { } // 4.处理应答 - err = handleScanImageResponseBodyDataResults(ctrlBlock, results) + err = handleScanImageResponseBodyDataResults(ctx, ctrlBlock, results) successNum = len(imageaudittasks) return @@ -131,7 +131,7 @@ func createScanImageAdvanceRequest(ctrlBlock *ImageAuditTaskBatchControlBlock) ( return } -func handleScanImageResponseBodyDataResults(ctrlBlock *ImageAuditTaskBatchControlBlock, results []*imageaudit.ScanImageResponseBodyDataResults) (err error) { +func handleScanImageResponseBodyDataResults(ctx *gin.Context, ctrlBlock *ImageAuditTaskBatchControlBlock, results []*imageaudit.ScanImageResponseBodyDataResults) (err error) { taskCtrlBlocks := ctrlBlock.TaskCtrlBlocks img2taskIndexMap := ctrlBlock.Img2taskIndexMap actionMap := ctrlBlock.ActionMap @@ -147,7 +147,7 @@ func handleScanImageResponseBodyDataResults(ctrlBlock *ImageAuditTaskBatchContro // 2.立即在imageaudit-imageaudit中更新该次审核结果 logger.Info("Handling its audit record...") - pass, imageAudit, err := handleImageAudit(dataId, result) + pass, imageAudit, err := handleImageAudit(ctx, dataId, result) if err != nil { logger.Error("handleImageAudit fail: %v", err) } @@ -170,7 +170,7 @@ func handleScanImageResponseBodyDataResults(ctrlBlock *ImageAuditTaskBatchContro // 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 logger.Info("Recording the action result...") if isActionCompleted { - if err = finalizeTask(action); err != nil { + if err = finalizeTask(ctx, action); err != nil { logger.Error("finalizeTask fail: %v", err) } } @@ -179,8 +179,7 @@ func handleScanImageResponseBodyDataResults(ctrlBlock *ImageAuditTaskBatchContro return } -func handleImageAudit(dataId string, result *imageaudit.ScanImageResponseBodyDataResults) (pass bool, imageaudit *dbstruct.ImageAudit, err error) { - ctx := &gin.Context{} +func handleImageAudit(ctx *gin.Context, dataId string, result *imageaudit.ScanImageResponseBodyDataResults) (pass bool, imageaudit *dbstruct.ImageAudit, err error) { imageaudit = &dbstruct.ImageAudit{ Id: goproto.String(dataId), } @@ -219,7 +218,7 @@ func handleTaskAction(task *ImageAuditTaskControlBlock, actionMap map[string]*Im } // 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 -func finalizeTask(action *ImageAuditAction) (err error) { +func finalizeTask(ctx *gin.Context, action *ImageAuditAction) (err error) { lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务和它的索引 passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId @@ -249,22 +248,22 @@ func finalizeTask(action *ImageAuditAction) (err error) { // 判定任务链终态,终态非成功,执行回退操作,记录到数据库中 if !action.IsPassed() { - if err = executeRollBack(lastValidTask.ImageAuditTask); err != nil { + if err = executeRollBack(ctx, lastValidTask.ImageAuditTask); err != nil { return } } // 更新所有任务状态 - if err = updatePassedTasks(passTaskIds); err != nil { + if err = updatePassedTasks(ctx, passTaskIds); err != nil { return } - if err = updateExpiredTasks(expiredTaskIds); err != nil { + if err = updateExpiredTasks(ctx, expiredTaskIds); err != nil { return } // 终态成功,执行成功后操作 if action.IsPassed() { - if err = handleSuccess(action.TaskChain[len(action.TaskChain)-1].ImageAuditTask); err != nil { + if err = handleSuccess(ctx, action.TaskChain[len(action.TaskChain)-1].ImageAuditTask); err != nil { return } } @@ -339,8 +338,7 @@ func copyScanResultInfo(imageaudit *dbstruct.ImageAudit, result *imageaudit.Scan return } -func executeRollBack(lastValidTask *dbstruct.ImageAuditTask) (err error) { - ctx := &gin.Context{} +func executeRollBack(ctx *gin.Context, lastValidTask *dbstruct.ImageAuditTask) (err error) { lastValidTask.Status = goproto.Int64(consts.ImageAudit_Rollbacked) if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.ImageAuditTaskUpdate_Rollback); err != nil { logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.Id), err) @@ -367,8 +365,7 @@ func executeRollBack(lastValidTask *dbstruct.ImageAuditTask) (err error) { return } -func handleSuccess(task *dbstruct.ImageAuditTask) (err error) { - ctx := &gin.Context{} +func handleSuccess(ctx *gin.Context, task *dbstruct.ImageAuditTask) (err error) { task.Status = goproto.Int64(consts.ImageAudit_Passed) if err = _DefaultResultHandler.Handle(ctx, task, consts.ImageAuditTaskUpdate_Success); err != nil { logger.Error("Handle success taskId:%v fail:%v", util.DerefString(task.Id), err) @@ -386,8 +383,7 @@ func handleSuccess(task *dbstruct.ImageAuditTask) (err error) { return } -func updatePassedTasks(passTaskIds []string) (err error) { - ctx := &gin.Context{} +func updatePassedTasks(ctx *gin.Context, passTaskIds []string) (err error) { if err = _DefaultImageAuditTask.OpUpdateByIds(ctx, &imageaudittaskproto.OpUpdateByIdsReq{ ImageAuditTask: &dbstruct.ImageAuditTask{ Status: goproto.Int64(consts.ImageAudit_Passed), @@ -401,8 +397,7 @@ func updatePassedTasks(passTaskIds []string) (err error) { return } -func updateExpiredTasks(expiredTaskIds []string) (err error) { - ctx := &gin.Context{} +func updateExpiredTasks(ctx *gin.Context, expiredTaskIds []string) (err error) { if err = _DefaultImageAuditTask.OpUpdateByIds(ctx, &imageaudittaskproto.OpUpdateByIdsReq{ ImageAuditTask: &dbstruct.ImageAuditTask{ Status: goproto.Int64(consts.ImageAudit_Expired), @@ -416,9 +411,8 @@ func updateExpiredTasks(expiredTaskIds []string) (err error) { return } -func handleBatchError(ctrlBlock *ImageAuditTaskBatchControlBlock, _err error) (err error) { +func handleBatchError(ctx *gin.Context, ctrlBlock *ImageAuditTaskBatchControlBlock, _err error) (err error) { logger.Info("All tasks of this batchId: %v has failed, rolling back...", ctrlBlock.BatchId) - ctx := &gin.Context{} if err = _DefaultImageAudit.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.ImageAudit{ Status: goproto.Int64(consts.ImageAudit_ServiceFailed), Remarks: goproto.String("批次任务失败,原因详见task表"), diff --git a/library/contentaudit/textaudit/client.go b/library/contentaudit/textaudit/client.go index 31e40033..9f74d157 100644 --- a/library/contentaudit/textaudit/client.go +++ b/library/contentaudit/textaudit/client.go @@ -62,7 +62,7 @@ func Init(cfg *configcenter.TextAuditConfig) (err error) { // 初始化审核选项 labels = strings.Split(cfg.Labels, " ") - _, err = RefreshBatchId() + _, err = RefreshBatchId(&gin.Context{}) if err != nil { logger.Error("RefreshBatchId fail, err: %v", err) return diff --git a/library/contentaudit/textaudit/control_block.go b/library/contentaudit/textaudit/control_block.go index 23f7c3c2..244f7b1a 100644 --- a/library/contentaudit/textaudit/control_block.go +++ b/library/contentaudit/textaudit/control_block.go @@ -17,7 +17,7 @@ type TextAuditTaskBatchControlBlock struct { ActionMap map[string]*TextAuditAction // 动作Id号-action的map } -func NewTextAuditTaskBatchControlBlock(tasks []*dbstruct.TextAuditTask, batchId string) *TextAuditTaskBatchControlBlock { +func NewTextAuditTaskBatchControlBlock(ctx *gin.Context, tasks []*dbstruct.TextAuditTask, batchId string) *TextAuditTaskBatchControlBlock { if len(tasks) == 0 { return nil } @@ -30,14 +30,13 @@ func NewTextAuditTaskBatchControlBlock(tasks []*dbstruct.TextAuditTask, batchId for _, task := range tasks { taskCtrlBlock := NewTextAuditTaskControlBlock(task) ctrlBlock.TaskCtrlBlocks = append(ctrlBlock.TaskCtrlBlocks, taskCtrlBlock) - ctrlBlock.RecordAction(taskCtrlBlock) + ctrlBlock.RecordAction(ctx, taskCtrlBlock) } return ctrlBlock } -func (ctrlBlock *TextAuditTaskBatchControlBlock) RecordAction(taskCtrlBlock *TextAuditTaskControlBlock) { - ctx := &gin.Context{} +func (ctrlBlock *TextAuditTaskBatchControlBlock) RecordAction(ctx *gin.Context, taskCtrlBlock *TextAuditTaskControlBlock) { // 写map记录 if ctrlBlock.ActionMap[taskCtrlBlock.ActionId] == nil { // 写入ctrlBlock.ActionMap ctrlBlock.ActionMap[taskCtrlBlock.ActionId] = NewTextAuditAction() @@ -66,8 +65,8 @@ type TextAuditTaskControlBlock struct { // 新建文字审核任务块 func NewTextAuditTaskControlBlock(task *dbstruct.TextAuditTask) (tcb *TextAuditTaskControlBlock) { tcb = &TextAuditTaskControlBlock{ - ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName), - util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)), + ActionId: fmt.Sprintf("%v%v%v%v", task.GetAssociativeDatabase(), task.GetAssociativeTableName(), + task.GetAssociativeTableId(), task.GetAssociativeTableColumn()), TextAuditTask: task, IsTaskPassed: true, IsGivingNoticeToBatch: false, diff --git a/library/contentaudit/textaudit/textaudit.go b/library/contentaudit/textaudit/textaudit.go index fe823415..9327bbe7 100644 --- a/library/contentaudit/textaudit/textaudit.go +++ b/library/contentaudit/textaudit/textaudit.go @@ -17,8 +17,8 @@ import ( ) // 刷新批次号 -func RefreshBatchId() (string, error) { - batchId, err := _DefaultContentAuditRTI.GetAndUpdateTextAuditBatchId(&gin.Context{}, genereteBatchId()) +func RefreshBatchId(ctx *gin.Context) (string, error) { + batchId, err := _DefaultContentAuditRTI.GetAndUpdateTextAuditBatchId(ctx, genereteBatchId()) if err != nil { logger.Info("_DefaultContentAuditRTI GetAndUpdateTextAuditBatchId fail: %v", err) return "", err @@ -26,10 +26,10 @@ func RefreshBatchId() (string, error) { return batchId, nil } -func Run(batchId string) (successNum int, failNum int, err error) { +func Run(ctx *gin.Context, batchId string) (successNum int, failNum int, err error) { // 查询该批次所有审核任务 - textaudittasks, err := _DefaultTextAuditTask.OpList(&gin.Context{}, &textaudittaskproto.OpListReq{ + textaudittasks, err := _DefaultTextAuditTask.OpList(ctx, &textaudittaskproto.OpListReq{ BatchId: goproto.String(batchId), Status: goproto.Int64(consts.TextAudit_Created), Sort: "ct", @@ -44,7 +44,7 @@ func Run(batchId string) (successNum int, failNum int, err error) { logger.Info("Text audit batch started, batchId : %v, task number : %v", batchId, len(textaudittasks)) // 1.构建批量任务控制块,初始化必要的actionMap、填充送审text等信息 - ctrlBlock := NewTextAuditTaskBatchControlBlock(textaudittasks, batchId) + ctrlBlock := NewTextAuditTaskBatchControlBlock(ctx, textaudittasks, batchId) // 2.创建请求 reqs, err := createScanTextRequest(ctrlBlock) @@ -72,7 +72,7 @@ func Run(batchId string) (successNum int, failNum int, err error) { } // 4.处理应答 - err = handleScanTextResponseBodyDataElements(ctrlBlock, results) + err = handleScanTextResponseBodyDataElements(ctx, ctrlBlock, results) successNum = len(textaudittasks) return @@ -115,7 +115,7 @@ func createScanTextRequest(ctrlBlock *TextAuditTaskBatchControlBlock) (requests return } -func handleScanTextResponseBodyDataElements(ctrlBlock *TextAuditTaskBatchControlBlock, results []*textaudit.ScanTextResponseBodyDataElements) (err error) { +func handleScanTextResponseBodyDataElements(ctx *gin.Context, ctrlBlock *TextAuditTaskBatchControlBlock, results []*textaudit.ScanTextResponseBodyDataElements) (err error) { taskCtrlBlocks := ctrlBlock.TaskCtrlBlocks actionMap := ctrlBlock.ActionMap for i, result := range results { @@ -123,7 +123,7 @@ func handleScanTextResponseBodyDataElements(ctrlBlock *TextAuditTaskBatchControl action := &TextAuditAction{} // 1.立即在textaudit-textaudit中更新该次审核结果 - pass, textaudit, err := handleTextAudit(taskCtrlBlocks[i].TextAuditTask.TextAuditId, result) + pass, textaudit, err := handleTextAudit(ctx, taskCtrlBlocks[i].TextAuditTask.TextAuditId, result) if err != nil { logger.Error("handleTextAudit fail: %v", err) } @@ -137,7 +137,7 @@ func handleScanTextResponseBodyDataElements(ctrlBlock *TextAuditTaskBatchControl // 4.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 if isActionCompleted { - if err = finalizeTask(action); err != nil { + if err = finalizeTask(ctx, action); err != nil { logger.Error("finalizeTask fail: %v", err) } } @@ -145,8 +145,7 @@ func handleScanTextResponseBodyDataElements(ctrlBlock *TextAuditTaskBatchControl return } -func handleTextAudit(id *string, result *textaudit.ScanTextResponseBodyDataElements) (pass bool, textaudit *dbstruct.TextAudit, err error) { - ctx := &gin.Context{} +func handleTextAudit(ctx *gin.Context, id *string, result *textaudit.ScanTextResponseBodyDataElements) (pass bool, textaudit *dbstruct.TextAudit, err error) { textaudit = &dbstruct.TextAudit{ Id: id, } @@ -175,7 +174,7 @@ func handleTaskAction(task *TextAuditTaskControlBlock, actionMap map[string]*Tex } // 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 -func finalizeTask(action *TextAuditAction) (err error) { +func finalizeTask(ctx *gin.Context, action *TextAuditAction) (err error) { lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务及其索引 passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId,失效任务是指那些前驱任务依然失败的失败任务,这些任务已失去实际意义,不可以用来回退 @@ -203,22 +202,22 @@ func finalizeTask(action *TextAuditAction) (err error) { } // 判定任务链终态,终态非成功,执行回退操作,记录到数据库中 if !action.IsPassed() { - if err = executeRollBack(lastValidTask.TextAuditTask); err != nil { + if err = executeRollBack(ctx, lastValidTask.TextAuditTask); err != nil { return } } // 更新所有任务状态 - if err = updatePassedTasks(passTaskIds); err != nil { + if err = updatePassedTasks(ctx, passTaskIds); err != nil { return } - if err = updateExpiredTasks(expiredTaskIds); err != nil { + if err = updateExpiredTasks(ctx, expiredTaskIds); err != nil { return } // 终态成功,执行成功后操作 if action.IsPassed() { - if err = handleSuccess(action.TaskChain[len(action.TaskChain)-1].TextAuditTask); err != nil { + if err = handleSuccess(ctx, action.TaskChain[len(action.TaskChain)-1].TextAuditTask); err != nil { return } } @@ -290,8 +289,7 @@ func parseDetailsContexts(contexts []*textaudit.ScanTextResponseBodyDataElements return ctxs } -func executeRollBack(lastValidTask *dbstruct.TextAuditTask) (err error) { - ctx := &gin.Context{} +func executeRollBack(ctx *gin.Context, lastValidTask *dbstruct.TextAuditTask) (err error) { lastValidTask.Status = goproto.Int64(consts.TextAudit_Rollbacked) if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.TextAuditTaskUpdate_Rollback); err != nil { logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.Id), err) @@ -318,8 +316,7 @@ func executeRollBack(lastValidTask *dbstruct.TextAuditTask) (err error) { return } -func handleSuccess(task *dbstruct.TextAuditTask) (err error) { - ctx := &gin.Context{} +func handleSuccess(ctx *gin.Context, task *dbstruct.TextAuditTask) (err error) { task.Status = goproto.Int64(consts.TextAudit_Passed) if err = _DefaultResultHandler.Handle(ctx, task, consts.TextAuditTaskUpdate_Success); err != nil { logger.Error("Handle success taskId:%v fail:%v", util.DerefString(task.Id), err) @@ -337,8 +334,7 @@ func handleSuccess(task *dbstruct.TextAuditTask) (err error) { return } -func updatePassedTasks(passTaskIds []string) (err error) { - ctx := &gin.Context{} +func updatePassedTasks(ctx *gin.Context, passTaskIds []string) (err error) { if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{ TextAuditTask: &dbstruct.TextAuditTask{ Status: goproto.Int64(consts.TextAudit_Passed), @@ -351,8 +347,7 @@ func updatePassedTasks(passTaskIds []string) (err error) { return } -func updateExpiredTasks(expiredTaskIds []string) (err error) { - ctx := &gin.Context{} +func updateExpiredTasks(ctx *gin.Context, expiredTaskIds []string) (err error) { if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{ TextAuditTask: &dbstruct.TextAuditTask{ Status: goproto.Int64(consts.TextAudit_Expired), diff --git a/library/contentaudit/video_moderation/client.go b/library/contentaudit/video_moderation/client.go index 0876be5e..f4bd232c 100644 --- a/library/contentaudit/video_moderation/client.go +++ b/library/contentaudit/video_moderation/client.go @@ -72,7 +72,7 @@ func Init(cfg *configcenter.VideoModerationConfig) (err error) { return } - _, err = RefreshBatchId() + _, err = RefreshBatchId(&gin.Context{}) if err != nil { logger.Error("RefreshBatchId fail, cfg: %v, err: %v", util.ToJson(config), err) return diff --git a/library/contentaudit/video_moderation/control_block.go b/library/contentaudit/video_moderation/control_block.go index 64a93738..05082ad2 100644 --- a/library/contentaudit/video_moderation/control_block.go +++ b/library/contentaudit/video_moderation/control_block.go @@ -20,8 +20,7 @@ type VideoModerationTaskBatchControlBlock struct { FinishedTaskNum int64 // 已完成任务数量 } -func (ctrlBlock *VideoModerationTaskBatchControlBlock) RecordAction(taskCtrlBlock *VideoModerationTaskControlBlock) { - ctx := &gin.Context{} +func (ctrlBlock *VideoModerationTaskBatchControlBlock) RecordAction(ctx *gin.Context, taskCtrlBlock *VideoModerationTaskControlBlock) { // 写map记录 if ctrlBlock.ActionMap[taskCtrlBlock.ActionId] == nil { // 写入ctrlBlock.ActionMap ctrlBlock.ActionMap[taskCtrlBlock.ActionId] = NewVideoModerationAction() diff --git a/library/contentaudit/video_moderation/handler.go b/library/contentaudit/video_moderation/handler.go index b5c47d0b..fd56d9d6 100644 --- a/library/contentaudit/video_moderation/handler.go +++ b/library/contentaudit/video_moderation/handler.go @@ -17,7 +17,7 @@ import ( goproto "google.golang.org/protobuf/proto" ) -func HandleVideoModerationContent(content string) (err error) { +func HandleVideoModerationContent(ctx *gin.Context, content string) (err error) { // 获取ResponseBody,解析出batchId和视频审核表id logger.Info("Unmarshaling ResponseBody...") result := &green20220302.VideoModerationResultResponseBody{} @@ -27,11 +27,11 @@ func HandleVideoModerationContent(content string) (err error) { return } - err = handleVideoModerationResultResponseBody(result) + err = handleVideoModerationResultResponseBody(ctx, result) return } -func handleVideoModerationResultResponseBody(result *green20220302.VideoModerationResultResponseBody) (err error) { +func handleVideoModerationResultResponseBody(ctx *gin.Context, result *green20220302.VideoModerationResultResponseBody) (err error) { isTaskCompleted := false isActionCompleted := false @@ -39,7 +39,7 @@ func handleVideoModerationResultResponseBody(result *green20220302.VideoModerati // 1.立即在视频审核表更新该次审核结果 logger.Info("Handling its moderation record...") - isPassed, videomoderation, err := handleVideoModeration(result) + isPassed, videomoderation, err := handleVideoModeration(ctx, result) if err != nil { logger.Error("handleVideoModeration fail: %v", err) } @@ -74,7 +74,7 @@ func handleVideoModerationResultResponseBody(result *green20220302.VideoModerati // 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 logger.Info("Recording the action result...") if isActionCompleted { - if err = finalizeTask(action); err != nil { + if err = finalizeTask(ctx, action); err != nil { logger.Error("finalizeTask fail: %v", err) } logger.Info("video moderation task of id %v has finished...", tcb.VideoModerationTask.GetId()) @@ -91,7 +91,7 @@ func handleVideoModerationResultResponseBody(result *green20220302.VideoModerati return } -func handleVideoModeration(result *green20220302.VideoModerationResultResponseBody) (isPassed bool, videomoderation *dbstruct.VideoModeration, err error) { +func handleVideoModeration(ctx *gin.Context, result *green20220302.VideoModerationResultResponseBody) (isPassed bool, videomoderation *dbstruct.VideoModeration, err error) { dataId := util.DerefString(result.Data.DataId) batchId := dataId[:14] @@ -140,7 +140,7 @@ func handleVideoModeration(result *green20220302.VideoModerationResultResponseBo } } - if err = _DefaultVideoModeration.OpUpdate(&gin.Context{}, &video_moderation_proto.OpUpdateReq{ + if err = _DefaultVideoModeration.OpUpdate(ctx, &video_moderation_proto.OpUpdateReq{ VideoModeration: videomoderation, }); err != nil { logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err) @@ -286,7 +286,7 @@ func handleTaskAction(task *VideoModerationTaskControlBlock, actionMap map[strin } // 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 -func finalizeTask(action *VideoModerationAction) (err error) { +func finalizeTask(ctx *gin.Context, action *VideoModerationAction) (err error) { lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务和它的索引 passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId @@ -316,22 +316,22 @@ func finalizeTask(action *VideoModerationAction) (err error) { // 判定任务链终态,终态非成功,执行回退操作,记录到数据库中 if !action.IsPassed() { - if err = executeRollBack(lastValidTask.VideoModerationTask); err != nil { + if err = executeRollBack(ctx, lastValidTask.VideoModerationTask); err != nil { return } } // 更新所有任务状态 - if err = updatePassedTasks(passTaskIds); err != nil { + if err = updatePassedTasks(ctx, passTaskIds); err != nil { return } - if err = updateExpiredTasks(expiredTaskIds); err != nil { + if err = updateExpiredTasks(ctx, expiredTaskIds); err != nil { return } // 终态成功,执行成功后操作 if action.IsPassed() { - if err = handleSuccess(action.TaskChain[len(action.TaskChain)-1].VideoModerationTask); err != nil { + if err = handleSuccess(ctx, action.TaskChain[len(action.TaskChain)-1].VideoModerationTask); err != nil { return } } @@ -341,8 +341,7 @@ func finalizeTask(action *VideoModerationAction) (err error) { return } -func executeRollBack(lastValidTask *dbstruct.VideoModerationTask) (err error) { - ctx := &gin.Context{} +func executeRollBack(ctx *gin.Context, lastValidTask *dbstruct.VideoModerationTask) (err error) { lastValidTask.Status = goproto.Int64(consts.VideoModeration_Rollbacked) if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.VideoModerationTaskUpdate_Rollback); err != nil { logger.Error("Roll back taskId:%v fail:%v", lastValidTask.GetId(), err) @@ -371,8 +370,7 @@ func executeRollBack(lastValidTask *dbstruct.VideoModerationTask) (err error) { return } -func handleSuccess(task *dbstruct.VideoModerationTask) (err error) { - ctx := &gin.Context{} +func handleSuccess(ctx *gin.Context, task *dbstruct.VideoModerationTask) (err error) { task.Status = goproto.Int64(consts.VideoModeration_Passed) if err = _DefaultResultHandler.Handle(ctx, task, consts.VideoModerationTaskUpdate_Success); err != nil { logger.Error("Handle success taskId:%v fail:%v", task.GetId(), err) @@ -390,8 +388,7 @@ func handleSuccess(task *dbstruct.VideoModerationTask) (err error) { return } -func updatePassedTasks(passTaskIds []string) (err error) { - ctx := &gin.Context{} +func updatePassedTasks(ctx *gin.Context, passTaskIds []string) (err error) { if err = _DefaultVideoModerationTask.OpUpdateByIds(ctx, &video_moderation_task_proto.OpUpdateByIdsReq{ VideoModerationTask: &dbstruct.VideoModerationTask{ Status: goproto.Int64(consts.VideoModeration_Passed), @@ -405,8 +402,7 @@ func updatePassedTasks(passTaskIds []string) (err error) { return } -func updateExpiredTasks(expiredTaskIds []string) (err error) { - ctx := &gin.Context{} +func updateExpiredTasks(ctx *gin.Context, expiredTaskIds []string) (err error) { if err = _DefaultVideoModerationTask.OpUpdateByIds(ctx, &video_moderation_task_proto.OpUpdateByIdsReq{ VideoModerationTask: &dbstruct.VideoModerationTask{ Status: goproto.Int64(consts.VideoModeration_Expired), diff --git a/library/contentaudit/video_moderation/video_moderation.go b/library/contentaudit/video_moderation/video_moderation.go index f5dc695c..bbf14dc8 100644 --- a/library/contentaudit/video_moderation/video_moderation.go +++ b/library/contentaudit/video_moderation/video_moderation.go @@ -21,8 +21,8 @@ import ( ) // 刷新批次号 -func RefreshBatchId() (string, error) { - batchId, err := _DefaultContentAuditRTI.GetAndUpdateVideoModerationBatchId(&gin.Context{}, genereteBatchId()) +func RefreshBatchId(ctx *gin.Context) (string, error) { + batchId, err := _DefaultContentAuditRTI.GetAndUpdateVideoModerationBatchId(ctx, genereteBatchId()) if err != nil { logger.Info("_DefaultContentAuditRTI GetAndUpdateVideoModerationBatchId fail: %v", err) return "", err @@ -31,10 +31,10 @@ func RefreshBatchId() (string, error) { } // 视频审核主逻辑 -func Run(batchId string) (successNum int, failNum int, err error) { +func Run(ctx *gin.Context, batchId string) (successNum int, failNum int, err error) { // 1.查询该批次所有审核任务 - videoModerationTasks, err := _DefaultVideoModerationTask.OpList(&gin.Context{}, &video_moderation_task_proto.OpListReq{ + videoModerationTasks, err := _DefaultVideoModerationTask.OpList(ctx, &video_moderation_task_proto.OpListReq{ BatchId: goproto.String(batchId), Status: goproto.Int64(consts.VideoModeration_Created), Sort: "ct", @@ -50,7 +50,7 @@ func Run(batchId string) (successNum int, failNum int, err error) { logger.Info("Video moderation batch started, batchId : %v, task number : %v", batchId, len(videoModerationTasks)) // 2.创建批量任务控制块,获取待审核的视频及其dataId(视频审核表Id),将批量任务控制块暂时保存 - btcb, videos, dataIds := createVideoModerationTaskBatchControlBlock(videoModerationTasks) + btcb, videos, dataIds := createVideoModerationTaskBatchControlBlock(ctx, videoModerationTasks) btcb.BatchId = batchId defaultVideoModerationTaskScheduler.btcbMp[btcb.BatchId] = btcb @@ -64,14 +64,14 @@ func Run(batchId string) (successNum int, failNum int, err error) { } for i, req := range reqs { _result, err := defaultVideoModerationClient.VideoModerationWithOptions(req, runtime) - handleVideoModerationResponse(_result, err, dataIds[i]) + handleVideoModerationResponse(ctx, _result, err, dataIds[i]) } successNum = len(videoModerationTasks) return } -func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModerationTask) (batchTaskCtrlBlock *VideoModerationTaskBatchControlBlock, videos []*dbstruct.MediaComponent, dataIds []string) { +func createVideoModerationTaskBatchControlBlock(ctx *gin.Context, tasks []*dbstruct.VideoModerationTask) (batchTaskCtrlBlock *VideoModerationTaskBatchControlBlock, videos []*dbstruct.MediaComponent, dataIds []string) { // 填充媒体,获取url和dataId,创建action信息 videos = make([]*dbstruct.MediaComponent, 0) mediaFillables := make([]mediafiller.MediaFillable, 0) @@ -96,13 +96,13 @@ func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModeratio batchTaskCtrlBlock.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = tcb batchTaskCtrlBlock.VidmodId2taskMap[task.GetVideoModerationId()] = tcb - batchTaskCtrlBlock.RecordAction(tcb) + batchTaskCtrlBlock.RecordAction(ctx, tcb) } else { // 已分片 videoIds := task.AuditedMedia.GetVideoIds() videoAuditIds := task.GetVideoModerationFragmentIds() batchTaskCtrlBlock.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = tcb - batchTaskCtrlBlock.RecordAction(tcb) + batchTaskCtrlBlock.RecordAction(ctx, tcb) for i, videoId := range videoIds { video := &dbstruct.MediaComponent{ VideoIds: util.Int64Slice([]int64{videoId}), @@ -116,7 +116,7 @@ func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModeratio } } - mediafiller.FillListInternal(&gin.Context{}, mediaFillables) + mediafiller.FillListInternal(ctx, mediaFillables) return } @@ -142,7 +142,7 @@ func createVideoModerationRequest(videos []*dbstruct.MediaComponent, dataIds []s return } -func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error, dataId string) { +func handleVideoModerationResponse(ctx *gin.Context, resp *green20220302.VideoModerationResponse, err error, dataId string) { logger.Info("Receive the response from VideoModerationResponse: %v", resp.String()) @@ -168,7 +168,7 @@ func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, } if !isSuccess { - rErr := handleVideoModerationResultResponseBody(&green20220302.VideoModerationResultResponseBody{ + rErr := handleVideoModerationResultResponseBody(ctx, &green20220302.VideoModerationResultResponseBody{ Code: goproto.Int32(code), Message: goproto.String(msg), Data: &green20220302.VideoModerationResultResponseBodyData{