diff --git a/app/mix/controller/video_moderation_callback.go b/app/mix/controller/video_moderation_callback.go index d11c32a7..d5fabc32 100644 --- a/app/mix/controller/video_moderation_callback.go +++ b/app/mix/controller/video_moderation_callback.go @@ -2,6 +2,7 @@ package controller import ( video_moderation_proto "service/api/proto/video_moderation/proto" + "service/app/mix/service" "service/bizcommon/util" videomoderation "service/library/contentaudit/video_moderation" "service/library/logger" @@ -18,11 +19,19 @@ func VideoModerationCallback(ctx *gin.Context) { return } - err = videomoderation.HandleVideoModerationContent(req.Content) + batchId, isBatchFinished, err := videomoderation.HandleVideoModerationContent(req.Content) if err != nil { logger.Error("HandleVideoModerationContent fail, req: %v, err: %v", util.ToJson(req), err) return } + // 同步批次视频审核结果 + if isBatchFinished { + err = service.DefaultService.SyncVideoModerationTaskResultByBatchId(&gin.Context{}, batchId) + if err != nil { + logger.Error("ERROR : batchId : %v, video moderation tasks of this batchId have failed to syncronize, err :%v", batchId, err) + } + } + ctx.String(200, "success") } diff --git a/app/mix/service/utilservice.go b/app/mix/service/utilservice.go index 49a20bd8..8dbba588 100644 --- a/app/mix/service/utilservice.go +++ b/app/mix/service/utilservice.go @@ -2157,9 +2157,9 @@ func (s *Service) utilSignHvyogoMessage(msg interfaces.HvyogoSignable) ([]byte, return resultBytes, nil } -// 同步图像审核结果 -func (s *Service) utilSyncVideoModerationTaskResultByBatchId(ctx *gin.Context, batchId string) (err error) { - // 查询得到该批次所有有关动态的图像审核任务 +// 同步视频审核结果 +func (s *Service) SyncVideoModerationTaskResultByBatchId(ctx *gin.Context, batchId string) (err error) { + // 查询得到该批次所有有关动态的视频审核任务 videomoderationtasks, err := _DefaultVideoModerationTask.OpList(ctx, &video_moderation_task_proto.OpListReq{ AssociativeDatabase: goproto.String(dao.DBMoment), AssociativeTableName: goproto.String(dao.COLMoment), @@ -2171,7 +2171,7 @@ func (s *Service) utilSyncVideoModerationTaskResultByBatchId(ctx *gin.Context, b return } - // 将有关动态的图像审核任务审核结果同步到动态审核表 + // 将有关动态的视频审核任务审核结果同步到动态审核表 _map := make(map[int64][]string) for _, videomoderationtask := range videomoderationtasks { ids := _map[videomoderationtask.GetStatus()] diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index c14b6bfb..6ef8b191 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -552,11 +552,6 @@ func (s *CronService) VideoModerationBatch(ctx context.Context, param *xxl.RunRe } else { handleMsg.WriteString(fmt.Sprintf("batchId : %v, video moderation tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) } - // 同步视频审核结果 - err = DefaultService.utilSyncVideoModerationTaskResultByBatchId(&gin.Context{}, batchId) - if err != nil { - handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, video moderation tasks of this batchId have failed to syncronize, err :%v", batchId, err)) - } logger.Info("Video moderation batch ends...") return handleMsg.String() @@ -574,11 +569,6 @@ func (s *CronService) VideoModerationBatchHis(ctx context.Context, param *xxl.Ru } else { handleMsg.WriteString(fmt.Sprintf("batchId : %v, video moderation tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)) } - // 同步视频审核结果 - err = DefaultService.utilSyncVideoModerationTaskResultByBatchId(&gin.Context{}, batchId) - if err != nil { - handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, video moderation tasks of this batchId have failed to syncronize, err :%v", batchId, err)) - } logger.Info("Video moderation batch ends...") return handleMsg.String() diff --git a/library/contentaudit/video_moderation/handler.go b/library/contentaudit/video_moderation/handler.go index 8df4bf1b..8336f9af 100644 --- a/library/contentaudit/video_moderation/handler.go +++ b/library/contentaudit/video_moderation/handler.go @@ -17,7 +17,7 @@ import ( goproto "google.golang.org/protobuf/proto" ) -func HandleVideoModerationContent(content string) (err error) { +func HandleVideoModerationContent(content string) (batchId string, isBatchFinished bool, err error) { // 获取ResponseBody,解析出batchId和视频审核表id logger.Info("Unmarshaling ResponseBody...") result := &green20220302.VideoModerationResultResponseBody{} @@ -27,11 +27,14 @@ func HandleVideoModerationContent(content string) (err error) { return } - err = handleVideoModerationResultResponseBody(result) + batchId, isBatchFinished, err = handleVideoModerationResultResponseBody(result) return } -func handleVideoModerationResultResponseBody(result *green20220302.VideoModerationResultResponseBody) (err error) { +func handleVideoModerationResultResponseBody(result *green20220302.VideoModerationResultResponseBody) (batchId string, isBatchFinished bool, err error) { + + batchId = "" + isBatchFinished = false isTaskCompleted := false isActionCompleted := false @@ -80,15 +83,16 @@ func handleVideoModerationResultResponseBody(result *green20220302.VideoModerati logger.Info("video moderation task of id %v has finished...", tcb.VideoModerationTask.GetId()) // 如果批次任务已经完成,从map中移除该批次任务控制块 - batchId := btcb.BatchId + batchId = btcb.BatchId btcb.FinishATask() if btcb.IsBatchFinished() { + isBatchFinished = true removeBtcb(btcb.BatchId) } logger.Info("video moderation batch task of batch_id %v has finished and been removed, all referenced request will be regarded as expired...", batchId) } - return nil + return } func handleVideoModeration(result *green20220302.VideoModerationResultResponseBody) (isPassed bool, videomoderation *dbstruct.VideoModeration, err error) {