From edb5caf001b155e9e948d37ccacc37e23b47f706 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Sat, 27 Apr 2024 11:14:44 +0800 Subject: [PATCH] by Robin at 20240427 --- .../controller/video_moderation_callback.go | 4 +- app/mix/service/cronservice.go | 1 + app/mix/service/xxljob_tasks.go | 9 ++ etc/mix/mix-test.yaml | 2 +- .../contentaudit/video_moderation/consts.go | 2 + .../video_moderation/control_block.go | 11 ++ .../contentaudit/video_moderation/handler.go | 112 ++++++------------ .../video_moderation/taskscheduler.go | 22 +++- .../video_moderation/video_moderation.go | 29 ++++- 9 files changed, 108 insertions(+), 84 deletions(-) diff --git a/app/mix/controller/video_moderation_callback.go b/app/mix/controller/video_moderation_callback.go index e374cbf9..d11c32a7 100644 --- a/app/mix/controller/video_moderation_callback.go +++ b/app/mix/controller/video_moderation_callback.go @@ -18,9 +18,9 @@ func VideoModerationCallback(ctx *gin.Context) { return } - err = videomoderation.HandleVideoModerationResult(req.Content) + err = videomoderation.HandleVideoModerationContent(req.Content) if err != nil { - logger.Error("HandleVideoModerationResult fail, req: %v, err: %v", util.ToJson(req), err) + logger.Error("HandleVideoModerationContent fail, req: %v, err: %v", util.ToJson(req), err) return } diff --git a/app/mix/service/cronservice.go b/app/mix/service/cronservice.go index 7dc68e12..036afb4c 100644 --- a/app/mix/service/cronservice.go +++ b/app/mix/service/cronservice.go @@ -61,6 +61,7 @@ func (s *CronService) Init(c any) (err error) { exec.RegTask("clear_wrong_pswd_times", s.ClearWrongPswdTimes) exec.RegTask("video_moderation_batch", s.VideoModerationBatch) exec.RegTask("video_moderation_batch_his", s.VideoModerationBatchHis) + exec.RegTask("clear_expired_btcb", s.ClearExpiredBtcb) exec.LogHandler(customLogHandle) //注册任务handler diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index 0e017917..0adea114 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -482,3 +482,12 @@ func (s *CronService) VideoModerationBatchHis(ctx context.Context, param *xxl.Ru logger.Info("Video moderation batch ends...") return handleMsg.String() } + +func (s *CronService) ClearExpiredBtcb(ctx context.Context, param *xxl.RunReq) (msg string) { + logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) + + videomoderation.ClearExpiredBtcb() + + logger.Info("Clear expired btcb ends...") + return "Clear expired btcb ends..." +} diff --git a/etc/mix/mix-test.yaml b/etc/mix/mix-test.yaml index 4efdf069..fb059527 100644 --- a/etc/mix/mix-test.yaml +++ b/etc/mix/mix-test.yaml @@ -95,7 +95,7 @@ textaudit: video_moderation: access_key_id: "LTAI5tQBGFS9BNiTHdbAAM74" access_key_secret: "vsV42N0ZCtJTQNwFLheCaV8oQn69hQ" - notify_url: "https://62375b61.r7.cpolar.top/ext/video_moderation/callback" + notify_url: "https://api.wishpal.cn/ext/video_moderation/callback" ali_acct_id: "1501583627133461" seed: "23XSkaZsMTz3xdnKtTmx229ZaJ3VS7e" diff --git a/library/contentaudit/video_moderation/consts.go b/library/contentaudit/video_moderation/consts.go index 035b3718..4a744fbd 100644 --- a/library/contentaudit/video_moderation/consts.go +++ b/library/contentaudit/video_moderation/consts.go @@ -98,3 +98,5 @@ var AudioLabelDescMap = map[string]string{ "nontalk": "静音音频", "C_customized": "用户库命中", } + +const SecElapsedBeforeTaskExpires = 600 // 审核任务10分钟后认为过期 diff --git a/library/contentaudit/video_moderation/control_block.go b/library/contentaudit/video_moderation/control_block.go index 2146d14d..ffc639fa 100644 --- a/library/contentaudit/video_moderation/control_block.go +++ b/library/contentaudit/video_moderation/control_block.go @@ -14,6 +14,9 @@ type VideoModerationTaskBatchControlBlock struct { TaskCtrlBlockMap map[string]*VideoModerationTaskControlBlock // 视频审核任务表id->任务控制块map VidmodId2taskMap map[string]*VideoModerationTaskControlBlock // 视频审核表id->任务控制块map ActionMap map[string]*VideoModerationAction // 动作Id号-action的map + Ct int64 // 创建时间戳 + TaskNum int64 // 任务数量 + FinishedTaskNum int64 // 已完成任务数量 } func (ctrlBlock *VideoModerationTaskBatchControlBlock) RecordAction(taskCtrlBlock *VideoModerationTaskControlBlock) { @@ -30,6 +33,14 @@ func (ctrlBlock *VideoModerationTaskBatchControlBlock) RecordAction(taskCtrlBloc ctrlBlock.ActionMap[taskCtrlBlock.ActionId].Record(taskCtrlBlock) } +func (ctrlBlock *VideoModerationTaskBatchControlBlock) IsBatchFinished() bool { + return ctrlBlock.TaskNum <= ctrlBlock.FinishedTaskNum +} + +func (ctrlBlock *VideoModerationTaskBatchControlBlock) FinishATask() { + ctrlBlock.FinishedTaskNum++ +} + // 视频审核任务控制块 // ActionId设计初衷:由于视频审核是定时任务触发的批量作业,如果在一次作业间隔有针对同一个视频媒体的多次更新,则会提交关于它的多次审核,需要保证数据一致性 type VideoModerationTaskControlBlock struct { diff --git a/library/contentaudit/video_moderation/handler.go b/library/contentaudit/video_moderation/handler.go index 0e040419..902bdb16 100644 --- a/library/contentaudit/video_moderation/handler.go +++ b/library/contentaudit/video_moderation/handler.go @@ -17,13 +17,8 @@ import ( goproto "google.golang.org/protobuf/proto" ) -func HandleVideoModerationResult(content string) (err error) { - - isTaskCompleted := false - isActionCompleted := false - action := &VideoModerationAction{} - - // 1.获取ResponseBody,解析出batchId和视频审核表id +func HandleVideoModerationContent(content string) (err error) { + // 获取ResponseBody,解析出batchId和视频审核表id logger.Info("Unmarshaling ResponseBody...") result := &green20220302.VideoModerationResultResponseBody{} err = json.Unmarshal([]byte(content), result) @@ -32,42 +27,65 @@ func HandleVideoModerationResult(content string) (err error) { return } - // 2.立即在视频审核表更新该次审核结果 + err = handleVideoModerationResultResponseBody(result) + return +} + +func handleVideoModerationResultResponseBody(result *green20220302.VideoModerationResultResponseBody) (err error) { + + isTaskCompleted := false + isActionCompleted := false + action := &VideoModerationAction{} + + // 1.立即在视频审核表更新该次审核结果 logger.Info("Handling its moderation record...") isPassed, videomoderation, err := handleVideoModeration(result) if err != nil { logger.Error("handleVideoModeration fail: %v", err) } - // 3.通过batchId从缓存的批次任务控制块里获取信息 + // 2.通过batchId从缓存的批次任务控制块里获取信息 logger.Info("Retriving batch task control block...") - btcb := getBtcb(videomoderation.GetBatchId()) - if btcb == nil { + btcb, ok := getBtcb(videomoderation.GetBatchId()) + if !ok || btcb == nil { // 若任务控制块已不存在,则认为该审核任务已过期 + logger.Info("video moderation batch task of batch_id %v has expired and been removed, you may want to rerun it from xxl-job...") return } - // 4.取出task + // 3.取出task logger.Info("Retriving its moderation task...") - tcb := btcb.VidmodId2taskMap[videomoderation.GetId()] + tcb, ok := btcb.VidmodId2taskMap[videomoderation.GetId()] + if !ok || tcb == nil { + logger.Info("video moderation task of id %v has expired and been removed, this is abnormal, you may want to check your code...") + return + } - // 5.在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功(非分片任务分片数为1) + // 4.在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功(非分片任务分片数为1) logger.Info("Recording it to task...") isTaskCompleted = handleTask(tcb, isPassed, videomoderation) - // 6.通过task的actionId,去actionId-[]*task的map查出本批次对该字段的动作链,更新其中关于自己的状态 + // 5.通过task的actionId,去actionId-[]*task的map查出本批次对该字段的动作链,更新其中关于自己的状态 logger.Info("Recording the task result...") if isTaskCompleted { isActionCompleted, action = handleTaskAction(tcb, btcb.ActionMap) } - // 7.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 + // 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 logger.Info("Recording the action result...") if isActionCompleted { if err = finalizeTask(action); err != nil { logger.Error("finalizeTask fail: %v", err) } - btcb.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = nil + logger.Info("video moderation task of id %v has finished...", tcb.VideoModerationTask.GetId()) + + // 如果批次任务已经完成,从map中移除该批次任务控制块 + batchId := btcb.BatchId + btcb.FinishATask() + if btcb.IsBatchFinished() { + removeBtcb(btcb.BatchId) + } + logger.Info("video moderation batch task of batch_id %v has finished and been removed, all referenced request will be regarded as expired...", batchId) } return nil @@ -127,12 +145,12 @@ func handleVideoModeration(result *green20220302.VideoModerationResultResponseBo sb := &strings.Builder{} for _, summary := range videomoderation.GetFrameSummaries() { if summary.GetLabel() != VideoModerationPassLabel { - sb.WriteString(fmt.Sprintf("共检测到%d处%s;的画面", summary.GetLabelSum(), FrameLabelDescMap[summary.GetLabel()])) + sb.WriteString(fmt.Sprintf("共检测到%d处%s的画面;", summary.GetLabelSum(), FrameLabelDescMap[summary.GetLabel()])) } } for _, summary := range videomoderation.GetAudioSummaries() { if summary.GetLabel() != AudioModerationPassLabel { - sb.WriteString(fmt.Sprintf("共检测到%d处%s;的音频", summary.GetLabelSum(), AudioLabelDescMap[summary.GetLabel()])) + sb.WriteString(fmt.Sprintf("共检测到%d处%s的音频;", summary.GetLabelSum(), AudioLabelDescMap[summary.GetLabel()])) } } videomoderation.Description = sb.String() @@ -379,59 +397,3 @@ func updateExpiredTasks(expiredTaskIds []string) (err error) { } return } - -// func handleVideoModerationError(dataId string, batchId string, msg string) (err error) { - -// ctx := &gin.Context{} -// if err = _DefaultVideoModeration.OpUpdate(ctx, &video_moderation_proto.OpUpdateReq{ -// VideoModeration: &dbstruct.VideoModeration{ -// Id: goproto.String(dataId), -// Status: goproto.Int64(consts.VideoModeration_ServiceFailed), -// Remarks: goproto.String("批次任务失败,原因详见task表"), -// }, -// }); err != nil { -// logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err) -// return -// } - -// // 先看是否已经批次失败 -// btcb := getBtcb(batchId) -// if btcb == nil { -// logger.Error("This video moderation task batch: %v has expired, you may want to rerun it", batchId) -// return -// } - -// tcb := btcb.VidmodId2taskMap[dataId] -// if !tcb.Failed { -// tcb.Failed = true -// task := tcb.VideoModerationTask - -// if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{ -// VideoModerationTask: &dbstruct.VideoModerationTask{ -// Id: tcb.VideoModerationTask.Id, -// Status: goproto.Int64(consts.VideoModeration_ServiceFailed), -// Remarks: goproto.String(msg), -// }, -// }); err != nil { -// logger.Error("_DefaultVideoModerationTask OpUpdate fail: %v\n", err) -// return -// } - -// task.Status = goproto.Int64(consts.VideoModeration_ServiceFailed) -// if err = _DefaultResultHandler.Handle(ctx, task, consts.VideoModerationTaskUpdate_Rollback); err != nil { -// if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{ -// VideoModerationTask: &dbstruct.VideoModerationTask{ -// Id: task.Id, -// Status: goproto.Int64(consts.VideoModeration_Failed), -// Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"), -// }, -// }); err != nil { -// logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err) -// } -// } - -// btcb.TaskCtrlBlockMap[task.GetId()] = nil -// } - -// return -// } diff --git a/library/contentaudit/video_moderation/taskscheduler.go b/library/contentaudit/video_moderation/taskscheduler.go index 431c9097..a96eba00 100644 --- a/library/contentaudit/video_moderation/taskscheduler.go +++ b/library/contentaudit/video_moderation/taskscheduler.go @@ -3,6 +3,7 @@ package videomoderation import ( "fmt" "service/library/configcenter" + "service/library/logger" "time" ) @@ -33,11 +34,26 @@ func genereteBatchId() string { } // 获取批次审核任务控制块 -func getBtcb(batchId string) *VideoModerationTaskBatchControlBlock { - return defaultVideoModerationTaskScheduler.btcbMp[batchId] +func getBtcb(batchId string) (*VideoModerationTaskBatchControlBlock, bool) { + for k, v := range defaultVideoModerationTaskScheduler.btcbMp { + logger.Info("batch_id: %v, btcb: %v", k, v) + } + logger.Info("now batch_id: %v", batchId) + btcb, ok := defaultVideoModerationTaskScheduler.btcbMp[batchId] + return btcb, ok } // 移除批次审核任务控制块 func removeBtcb(batchId string) { - defaultVideoModerationTaskScheduler.btcbMp[batchId] = nil + delete(defaultVideoModerationTaskScheduler.btcbMp, batchId) +} + +// 清理过期的批次审核任务控制块 +func ClearExpiredBtcb() { + nowTime := time.Now().Unix() + for _, v := range defaultVideoModerationTaskScheduler.btcbMp { + if nowTime-v.Ct > SecElapsedBeforeTaskExpires { + delete(defaultVideoModerationTaskScheduler.btcbMp, v.BatchId) + } + } } diff --git a/library/contentaudit/video_moderation/video_moderation.go b/library/contentaudit/video_moderation/video_moderation.go index 55aa7444..aa5e0ffd 100644 --- a/library/contentaudit/video_moderation/video_moderation.go +++ b/library/contentaudit/video_moderation/video_moderation.go @@ -9,6 +9,7 @@ import ( "service/dbstruct" "service/library/logger" "service/library/mediafiller" + "time" green20220302 "github.com/alibabacloud-go/green-20220302/client" @@ -76,6 +77,9 @@ func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModeratio TaskCtrlBlockMap: make(map[string]*VideoModerationTaskControlBlock), VidmodId2taskMap: make(map[string]*VideoModerationTaskControlBlock), ActionMap: make(map[string]*VideoModerationAction), + TaskNum: int64(len(tasks)), + FinishedTaskNum: 0, + Ct: time.Now().Unix(), } // 以batchId+视频审核表的Id作为dataId @@ -85,7 +89,7 @@ func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModeratio if task.GetIsFragmented() == 0 { videos = append(videos, task.AuditedMedia) mediaFillables = append(mediaFillables, task.AuditedMedia) - dataIds = append(dataIds, defaultVideoModerationTaskScheduler.batchId+task.GetVideoModerationId()) + dataIds = append(dataIds, task.GetBatchId()+task.GetVideoModerationId()) batchTaskCtrlBlock.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = tcb batchTaskCtrlBlock.VidmodId2taskMap[task.GetVideoModerationId()] = tcb @@ -135,22 +139,41 @@ func createVideoModerationRequest(videos []*dbstruct.MediaComponent, dataIds []s return } -func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error, dataId string, batchId string) { +func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error, dataId string) { logger.Info("Receive the response from VideoModerationResponse: %v", resp.String()) isSuccess := true + code := int32(http.StatusOK) + msg := "" + + // 针对错误和发送失败的情况,写入信息,移交handler处理 if err != nil { if _t, ok := err.(*tea.SDKError); ok { logger.Error("VideoModerationWithOptions fail, errinfo: %v", util.DerefString(_t.Data)) isSuccess = false + + code = http.StatusInternalServerError + msg = err.Error() } } else if util.DerefInt32(resp.StatusCode) != http.StatusOK { logger.Error("response not success. status: %d", util.DerefInt32(resp.StatusCode)) isSuccess = false + + code = util.DerefInt32(resp.StatusCode) + msg = util.DerefString(resp.Body.Message) } if !isSuccess { - //handleVideoModerationError(dataId, batchId, msg) + rErr := handleVideoModerationResultResponseBody(&green20220302.VideoModerationResultResponseBody{ + Code: goproto.Int32(code), + Message: goproto.String(msg), + Data: &green20220302.VideoModerationResultResponseBodyData{ + DataId: goproto.String(dataId), + }, + }) + if rErr != nil { + logger.Error("handleVideoModerationResultResponseBody fail, err: %v", rErr) + } } }