diff --git a/dbstruct/video_moderation.go b/dbstruct/video_moderation.go index a4da6bb7..dc38c924 100644 --- a/dbstruct/video_moderation.go +++ b/dbstruct/video_moderation.go @@ -70,6 +70,20 @@ func (p *VideoModeration) GetBatchId() string { return *p.BatchId } +func (p *VideoModeration) GetStatus() int64 { + if p == nil || p.Status == nil { + return 0 + } + return *p.Status +} + +func (p *VideoModeration) GetRemarks() string { + if p == nil || p.Remarks == nil { + return "" + } + return *p.Remarks +} + func (p *VideoModeration) GetFrameSummaries() []*FrameSummary { if p == nil || p.FrameSummaries == nil { return make([]*FrameSummary, 0) diff --git a/library/contentaudit/video_moderation/handler.go b/library/contentaudit/video_moderation/handler.go index 072376b3..0e040419 100644 --- a/library/contentaudit/video_moderation/handler.go +++ b/library/contentaudit/video_moderation/handler.go @@ -44,21 +44,21 @@ func HandleVideoModerationResult(content string) (err error) { btcb := getBtcb(videomoderation.GetBatchId()) if btcb == nil { // 若任务控制块已不存在,则认为该审核任务已过期 - + return } // 4.取出task logger.Info("Retriving its moderation task...") - task := btcb.VidmodId2taskMap[videomoderation.GetId()] + tcb := btcb.VidmodId2taskMap[videomoderation.GetId()] // 5.在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功(非分片任务分片数为1) logger.Info("Recording it to task...") - isTaskCompleted = handleTask(task, isPassed, videomoderation) + isTaskCompleted = handleTask(tcb, isPassed, videomoderation) // 6.通过task的actionId,去actionId-[]*task的map查出本批次对该字段的动作链,更新其中关于自己的状态 logger.Info("Recording the task result...") if isTaskCompleted { - isActionCompleted, action = handleTaskAction(task, btcb.ActionMap) + isActionCompleted, action = handleTaskAction(tcb, btcb.ActionMap) } // 7.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 @@ -67,45 +67,51 @@ func HandleVideoModerationResult(content string) (err error) { if err = finalizeTask(action); err != nil { logger.Error("finalizeTask fail: %v", err) } - // 从批次任务控制块map中移除该控制块 - removeBtcb(videomoderation.GetBatchId()) + btcb.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = nil } return nil } func handleVideoModeration(result *green20220302.VideoModerationResultResponseBody) (isPassed bool, videomoderation *dbstruct.VideoModeration, err error) { - // 阿里云服务失败 - if util.DerefInt32(result.Code) != http.StatusOK { - logger.Error("response not success. status: %d", util.DerefInt32(result.Code)) - } - // 阿里云服务成功 - frameResult := result.Data.FrameResult - audioResult := result.Data.AudioResult - - // 解析信息 dataId := util.DerefString(result.Data.DataId) batchId := dataId[:14] videomoderationId := dataId[14:] - frameSummaries, isFramesPassed := buildFrameSummaries(frameResult.FrameSummarys) - frameDetails := buildFrameDetails(frameResult.Frames) - audioSummaries, isAudioPassed := buildAudioSummaries(audioResult.AudioSummarys) - audioDetails := buildAudioDetails(result.Data.AudioResult.SliceDetails) + isPassed = true videomoderation = &dbstruct.VideoModeration{ - Id: goproto.String(videomoderationId), - FrameNum: frameResult.FrameNum, + Id: goproto.String(videomoderationId), } - videomoderation.SetFrameSummaries(frameSummaries).SetFrameDetails(frameDetails).SetAudioSummaries(audioSummaries).SetAudioDetails(audioDetails) - // 判定检测结果 - if isFramesPassed && isAudioPassed { - videomoderation.Status = goproto.Int64(consts.VideoModeration_Passed) - isPassed = true - } else { - videomoderation.Status = goproto.Int64(consts.VideoModeration_Rejected) + if util.DerefInt32(result.Code) != http.StatusOK { + // 阿里云服务失败 + logger.Error("response not success. status: %d", util.DerefInt32(result.Code)) isPassed = false + videomoderation.Status = goproto.Int64(consts.VideoModeration_ServiceFailed) + videomoderation.Remarks = goproto.String(fmt.Sprintf("任务失败:%v", util.DerefString(result.Message))) + + } else { + // 阿里云服务成功 + frameResult := result.Data.FrameResult + audioResult := result.Data.AudioResult + + // 解析信息 + frameSummaries, isFramesPassed := buildFrameSummaries(frameResult.FrameSummarys) + frameDetails := buildFrameDetails(frameResult.Frames) + audioSummaries, isAudioPassed := buildAudioSummaries(audioResult.AudioSummarys) + audioDetails := buildAudioDetails(result.Data.AudioResult.SliceDetails) + + videomoderation.FrameNum = frameResult.FrameNum + videomoderation.SetFrameSummaries(frameSummaries).SetFrameDetails(frameDetails).SetAudioSummaries(audioSummaries).SetAudioDetails(audioDetails) + + // 判定检测结果 + if isFramesPassed && isAudioPassed { + videomoderation.Status = goproto.Int64(consts.VideoModeration_Passed) + } else { + videomoderation.Status = goproto.Int64(consts.VideoModeration_Rejected) + isPassed = false + } } if err = _DefaultVideoModeration.OpUpdate(&gin.Context{}, &video_moderation_proto.OpUpdateReq{ @@ -116,7 +122,8 @@ func handleVideoModeration(result *green20220302.VideoModerationResultResponseBo } videomoderation.BatchId = goproto.String(batchId) - if isFramesPassed && isAudioPassed { + switch videomoderation.GetStatus() { + case consts.VideoModeration_Rejected: sb := &strings.Builder{} for _, summary := range videomoderation.GetFrameSummaries() { if summary.GetLabel() != VideoModerationPassLabel { @@ -128,6 +135,9 @@ func handleVideoModeration(result *green20220302.VideoModerationResultResponseBo sb.WriteString(fmt.Sprintf("共检测到%d处%s;的音频", summary.GetLabelSum(), AudioLabelDescMap[summary.GetLabel()])) } } + videomoderation.Description = sb.String() + case consts.VideoModeration_ServiceFailed: + videomoderation.Description = videomoderation.GetRemarks() } return @@ -369,3 +379,59 @@ func updateExpiredTasks(expiredTaskIds []string) (err error) { } return } + +// func handleVideoModerationError(dataId string, batchId string, msg string) (err error) { + +// ctx := &gin.Context{} +// if err = _DefaultVideoModeration.OpUpdate(ctx, &video_moderation_proto.OpUpdateReq{ +// VideoModeration: &dbstruct.VideoModeration{ +// Id: goproto.String(dataId), +// Status: goproto.Int64(consts.VideoModeration_ServiceFailed), +// Remarks: goproto.String("批次任务失败,原因详见task表"), +// }, +// }); err != nil { +// logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err) +// return +// } + +// // 先看是否已经批次失败 +// btcb := getBtcb(batchId) +// if btcb == nil { +// logger.Error("This video moderation task batch: %v has expired, you may want to rerun it", batchId) +// return +// } + +// tcb := btcb.VidmodId2taskMap[dataId] +// if !tcb.Failed { +// tcb.Failed = true +// task := tcb.VideoModerationTask + +// if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{ +// VideoModerationTask: &dbstruct.VideoModerationTask{ +// Id: tcb.VideoModerationTask.Id, +// Status: goproto.Int64(consts.VideoModeration_ServiceFailed), +// Remarks: goproto.String(msg), +// }, +// }); err != nil { +// logger.Error("_DefaultVideoModerationTask OpUpdate fail: %v\n", err) +// return +// } + +// task.Status = goproto.Int64(consts.VideoModeration_ServiceFailed) +// if err = _DefaultResultHandler.Handle(ctx, task, consts.VideoModerationTaskUpdate_Rollback); err != nil { +// if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{ +// VideoModerationTask: &dbstruct.VideoModerationTask{ +// Id: task.Id, +// Status: goproto.Int64(consts.VideoModeration_Failed), +// Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"), +// }, +// }); err != nil { +// logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err) +// } +// } + +// btcb.TaskCtrlBlockMap[task.GetId()] = nil +// } + +// return +// } diff --git a/library/contentaudit/video_moderation/video_moderation.go b/library/contentaudit/video_moderation/video_moderation.go index e8efbd7f..55aa7444 100644 --- a/library/contentaudit/video_moderation/video_moderation.go +++ b/library/contentaudit/video_moderation/video_moderation.go @@ -47,7 +47,7 @@ func Run(batchId string) (successNum int, failNum int, err error) { // 2.创建批量任务控制块,获取待审核的视频及其dataId(视频审核表Id),将批量任务控制块暂时保存 btcb, videos, dataIds := createVideoModerationTaskBatchControlBlock(videoModerationTasks) - btcb.BatchId = defaultVideoModerationTaskScheduler.batchId + btcb.BatchId = batchId defaultVideoModerationTaskScheduler.btcbMp[btcb.BatchId] = btcb // 3.创建审核请求 @@ -58,9 +58,9 @@ func Run(batchId string) (successNum int, failNum int, err error) { ReadTimeout: tea.Int(10000), ConnectTimeout: tea.Int(10000), } - for _, req := range reqs { + for i, req := range reqs { _result, err := defaultVideoModerationClient.VideoModerationWithOptions(req, runtime) - handleVideoModerationResponse(_result, err) + handleVideoModerationResponse(_result, err, dataIds[i]) } successNum = len(videoModerationTasks) @@ -135,16 +135,22 @@ func createVideoModerationRequest(videos []*dbstruct.MediaComponent, dataIds []s return } -func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error) { +func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error, dataId string, batchId string) { logger.Info("Receive the response from VideoModerationResponse: %v", resp.String()) + isSuccess := true if err != nil { if _t, ok := err.(*tea.SDKError); ok { logger.Error("VideoModerationWithOptions fail, errinfo: %v", util.DerefString(_t.Data)) + isSuccess = false } } else if util.DerefInt32(resp.StatusCode) != http.StatusOK { logger.Error("response not success. status: %d", util.DerefInt32(resp.StatusCode)) + isSuccess = false } + if !isSuccess { + //handleVideoModerationError(dataId, batchId, msg) + } }