feat-IRONFANS-86-Robin #378

chenhao merged 8 commits from feat-IRONFANS-86-Robin into main 2024-04-29 22:20:06 +08:00
3 changed files with 119 additions and 33 deletions
Showing only changes of commit ddbba1f860 - Show all commits

View File

@ -70,6 +70,20 @@ func (p *VideoModeration) GetBatchId() string {
return *p.BatchId
func (p *VideoModeration) GetStatus() int64 {
if p == nil || p.Status == nil {
return 0
return *p.Status
func (p *VideoModeration) GetRemarks() string {
if p == nil || p.Remarks == nil {
return ""
return *p.Remarks
func (p *VideoModeration) GetFrameSummaries() []*FrameSummary {
if p == nil || p.FrameSummaries == nil {
return make([]*FrameSummary, 0)

View File

@ -44,21 +44,21 @@ func HandleVideoModerationResult(content string) (err error) {
btcb := getBtcb(videomoderation.GetBatchId())
if btcb == nil {
// 若任务控制块已不存在,则认为该审核任务已过期
// 4.取出task
logger.Info("Retriving its moderation task...")
task := btcb.VidmodId2taskMap[videomoderation.GetId()]
tcb := btcb.VidmodId2taskMap[videomoderation.GetId()]
// 5.在task中记录本分片结果并累积已到达的分片数直到所有分片到达决定该任务是否成功(非分片任务分片数为1)
logger.Info("Recording it to task...")
isTaskCompleted = handleTask(task, isPassed, videomoderation)
isTaskCompleted = handleTask(tcb, isPassed, videomoderation)
// 6.通过task的actionId去actionId-[]*task的map查出本批次对该字段的动作链更新其中关于自己的状态
logger.Info("Recording the task result...")
if isTaskCompleted {
isActionCompleted, action = handleTaskAction(task, btcb.ActionMap)
isActionCompleted, action = handleTaskAction(tcb, btcb.ActionMap)
// 7.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
@ -67,46 +67,52 @@ func HandleVideoModerationResult(content string) (err error) {
if err = finalizeTask(action); err != nil {
logger.Error("finalizeTask fail: %v", err)
// 从批次任务控制块map中移除该控制块
btcb.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = nil
return nil
func handleVideoModeration(result *green20220302.VideoModerationResultResponseBody) (isPassed bool, videomoderation *dbstruct.VideoModeration, err error) {
// 阿里云服务失败
if util.DerefInt32(result.Code) != http.StatusOK {
logger.Error("response not success. status: %d", util.DerefInt32(result.Code))
dataId := util.DerefString(result.Data.DataId)
batchId := dataId[:14]
videomoderationId := dataId[14:]
isPassed = true
videomoderation = &dbstruct.VideoModeration{
Id: goproto.String(videomoderationId),
if util.DerefInt32(result.Code) != http.StatusOK {
// 阿里云服务失败
logger.Error("response not success. status: %d", util.DerefInt32(result.Code))
isPassed = false
videomoderation.Status = goproto.Int64(consts.VideoModeration_ServiceFailed)
videomoderation.Remarks = goproto.String(fmt.Sprintf("任务失败:%v", util.DerefString(result.Message)))
} else {
// 阿里云服务成功
frameResult := result.Data.FrameResult
audioResult := result.Data.AudioResult
// 解析信息
dataId := util.DerefString(result.Data.DataId)
batchId := dataId[:14]
videomoderationId := dataId[14:]
frameSummaries, isFramesPassed := buildFrameSummaries(frameResult.FrameSummarys)
frameDetails := buildFrameDetails(frameResult.Frames)
audioSummaries, isAudioPassed := buildAudioSummaries(audioResult.AudioSummarys)
audioDetails := buildAudioDetails(result.Data.AudioResult.SliceDetails)
videomoderation = &dbstruct.VideoModeration{
Id: goproto.String(videomoderationId),
FrameNum: frameResult.FrameNum,
videomoderation.FrameNum = frameResult.FrameNum
// 判定检测结果
if isFramesPassed && isAudioPassed {
videomoderation.Status = goproto.Int64(consts.VideoModeration_Passed)
isPassed = true
} else {
videomoderation.Status = goproto.Int64(consts.VideoModeration_Rejected)
isPassed = false
if err = _DefaultVideoModeration.OpUpdate(&gin.Context{}, &video_moderation_proto.OpUpdateReq{
VideoModeration: videomoderation,
@ -116,7 +122,8 @@ func handleVideoModeration(result *green20220302.VideoModerationResultResponseBo
videomoderation.BatchId = goproto.String(batchId)
if isFramesPassed && isAudioPassed {
switch videomoderation.GetStatus() {
case consts.VideoModeration_Rejected:
sb := &strings.Builder{}
for _, summary := range videomoderation.GetFrameSummaries() {
if summary.GetLabel() != VideoModerationPassLabel {
@ -128,6 +135,9 @@ func handleVideoModeration(result *green20220302.VideoModerationResultResponseBo
sb.WriteString(fmt.Sprintf("共检测到%d处%s的音频", summary.GetLabelSum(), AudioLabelDescMap[summary.GetLabel()]))
videomoderation.Description = sb.String()
case consts.VideoModeration_ServiceFailed:
videomoderation.Description = videomoderation.GetRemarks()
@ -369,3 +379,59 @@ func updateExpiredTasks(expiredTaskIds []string) (err error) {
// func handleVideoModerationError(dataId string, batchId string, msg string) (err error) {
// ctx := &gin.Context{}
// if err = _DefaultVideoModeration.OpUpdate(ctx, &video_moderation_proto.OpUpdateReq{
// VideoModeration: &dbstruct.VideoModeration{
// Id: goproto.String(dataId),
// Status: goproto.Int64(consts.VideoModeration_ServiceFailed),
// Remarks: goproto.String("批次任务失败原因详见task表"),
// },
// }); err != nil {
// logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err)
// return
// }
// // 先看是否已经批次失败
// btcb := getBtcb(batchId)
// if btcb == nil {
// logger.Error("This video moderation task batch: %v has expired, you may want to rerun it", batchId)
// return
// }
// tcb := btcb.VidmodId2taskMap[dataId]
// if !tcb.Failed {
// tcb.Failed = true
// task := tcb.VideoModerationTask
// if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{
// VideoModerationTask: &dbstruct.VideoModerationTask{
// Id: tcb.VideoModerationTask.Id,
// Status: goproto.Int64(consts.VideoModeration_ServiceFailed),
// Remarks: goproto.String(msg),
// },
// }); err != nil {
// logger.Error("_DefaultVideoModerationTask OpUpdate fail: %v\n", err)
// return
// }
// task.Status = goproto.Int64(consts.VideoModeration_ServiceFailed)
// if err = _DefaultResultHandler.Handle(ctx, task, consts.VideoModerationTaskUpdate_Rollback); err != nil {
// if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{
// VideoModerationTask: &dbstruct.VideoModerationTask{
// Id: task.Id,
// Status: goproto.Int64(consts.VideoModeration_Failed),
// Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
// },
// }); err != nil {
// logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err)
// }
// }
// btcb.TaskCtrlBlockMap[task.GetId()] = nil
// }
// return
// }

View File

@ -47,7 +47,7 @@ func Run(batchId string) (successNum int, failNum int, err error) {
// 2.创建批量任务控制块获取待审核的视频及其dataId(视频审核表Id),将批量任务控制块暂时保存
btcb, videos, dataIds := createVideoModerationTaskBatchControlBlock(videoModerationTasks)
btcb.BatchId = defaultVideoModerationTaskScheduler.batchId
btcb.BatchId = batchId
defaultVideoModerationTaskScheduler.btcbMp[btcb.BatchId] = btcb
// 3.创建审核请求
@ -58,9 +58,9 @@ func Run(batchId string) (successNum int, failNum int, err error) {
ReadTimeout: tea.Int(10000),
ConnectTimeout: tea.Int(10000),
for _, req := range reqs {
for i, req := range reqs {
_result, err := defaultVideoModerationClient.VideoModerationWithOptions(req, runtime)
handleVideoModerationResponse(_result, err)
handleVideoModerationResponse(_result, err, dataIds[i])
successNum = len(videoModerationTasks)
@ -135,16 +135,22 @@ func createVideoModerationRequest(videos []*dbstruct.MediaComponent, dataIds []s
func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error) {
func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error, dataId string, batchId string) {
logger.Info("Receive the response from VideoModerationResponse: %v", resp.String())
isSuccess := true
if err != nil {
if _t, ok := err.(*tea.SDKError); ok {
logger.Error("VideoModerationWithOptions fail, errinfo: %v", util.DerefString(_t.Data))
isSuccess = false
} else if util.DerefInt32(resp.StatusCode) != http.StatusOK {
logger.Error("response not success. status: %d", util.DerefInt32(resp.StatusCode))
isSuccess = false
if !isSuccess {
//handleVideoModerationError(dataId, batchId, msg)