package videomoderation import ( "encoding/json" "fmt" "net/http" "service/api/consts" video_moderation_proto "service/api/proto/video_moderation/proto" video_moderation_task_proto "service/api/proto/video_moderation_task/proto" "service/bizcommon/util" "service/dbstruct" "service/library/logger" "strings" green20220302 "github.com/alibabacloud-go/green-20220302/client" "github.com/gin-gonic/gin" goproto "google.golang.org/protobuf/proto" ) func HandleVideoModerationContent(content string) (err error) { // 获取ResponseBody,解析出batchId和视频审核表id logger.Info("Unmarshaling ResponseBody...") result := &green20220302.VideoModerationResultResponseBody{} err = json.Unmarshal([]byte(content), result) if err != nil { logger.Error("json unmarshal failed, err: %v", err) return } err = handleVideoModerationResultResponseBody(result) return } func handleVideoModerationResultResponseBody(result *green20220302.VideoModerationResultResponseBody) (err error) { isTaskCompleted := false isActionCompleted := false action := &VideoModerationAction{} // 1.立即在视频审核表更新该次审核结果 logger.Info("Handling its moderation record...") isPassed, videomoderation, err := handleVideoModeration(result) if err != nil { logger.Error("handleVideoModeration fail: %v", err) } // 2.通过batchId从缓存的批次任务控制块里获取信息 logger.Info("Retriving batch task control block...") btcb, ok := getBtcb(videomoderation.GetBatchId()) if !ok || btcb == nil { // 若任务控制块已不存在,则认为该审核任务已过期 logger.Info("video moderation batch task of batch_id %v has expired and been removed, you may want to rerun it from xxl-job...") return } // 3.取出task logger.Info("Retriving its moderation task...") tcb, ok := btcb.VidmodId2taskMap[videomoderation.GetId()] if !ok || tcb == nil { logger.Info("video moderation task of id %v has expired and been removed, this is abnormal, you may want to check your code...") return } // 4.在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功(非分片任务分片数为1) logger.Info("Recording it to task...") isTaskCompleted = handleTask(tcb, isPassed, videomoderation) // 5.通过task的actionId,去actionId-[]*task的map查出本批次对该字段的动作链,更新其中关于自己的状态 logger.Info("Recording the task result...") if isTaskCompleted { isActionCompleted, action = handleTaskAction(tcb, btcb.ActionMap) } // 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 logger.Info("Recording the action result...") if isActionCompleted { if err = finalizeTask(action); err != nil { logger.Error("finalizeTask fail: %v", err) } logger.Info("video moderation task of id %v has finished...", tcb.VideoModerationTask.GetId()) // 如果批次任务已经完成,从map中移除该批次任务控制块 batchId := btcb.BatchId btcb.FinishATask() if btcb.IsBatchFinished() { removeBtcb(btcb.BatchId) } logger.Info("video moderation batch task of batch_id %v has finished and been removed, all referenced request will be regarded as expired...", batchId) } return nil } func handleVideoModeration(result *green20220302.VideoModerationResultResponseBody) (isPassed bool, videomoderation *dbstruct.VideoModeration, err error) { dataId := util.DerefString(result.Data.DataId) batchId := dataId[:14] videomoderationId := dataId[14:] isPassed = true videomoderation = &dbstruct.VideoModeration{ Id: goproto.String(videomoderationId), } if util.DerefInt32(result.Code) != http.StatusOK { // 阿里云服务失败 logger.Error("response not success. status: %d", util.DerefInt32(result.Code)) isPassed = false videomoderation.Status = goproto.Int64(consts.VideoModeration_ServiceFailed) videomoderation.Remarks = goproto.String(fmt.Sprintf("任务失败:%v", util.DerefString(result.Message))) } else { // 阿里云服务成功 frameResult := result.Data.FrameResult audioResult := result.Data.AudioResult // 解析信息 isFramesPassed := true isAudioPassed := true if frameResult != nil { var frameSummaries []*dbstruct.FrameSummary frameSummaries, isFramesPassed = buildFrameSummaries(frameResult.FrameSummarys) frameDetails := buildFrameDetails(frameResult.Frames) videomoderation.SetFrameSummaries(frameSummaries).SetFrameDetails(frameDetails) videomoderation.FrameNum = frameResult.FrameNum } if audioResult != nil { var audioSummaries []*dbstruct.AudioSummary audioSummaries, isAudioPassed = buildAudioSummaries(audioResult.AudioSummarys) audioDetails := buildAudioDetails(audioResult.SliceDetails) videomoderation.SetAudioSummaries(audioSummaries).SetAudioDetails(audioDetails) } // 判定检测结果 if isFramesPassed && isAudioPassed { videomoderation.Status = goproto.Int64(consts.VideoModeration_Passed) } else { videomoderation.Status = goproto.Int64(consts.VideoModeration_Rejected) isPassed = false } } if err = _DefaultVideoModeration.OpUpdate(&gin.Context{}, &video_moderation_proto.OpUpdateReq{ VideoModeration: videomoderation, }); err != nil { logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err) return } videomoderation.BatchId = goproto.String(batchId) switch videomoderation.GetStatus() { case consts.VideoModeration_Rejected: sb := &strings.Builder{} for _, summary := range videomoderation.GetFrameSummaries() { if summary.GetLabel() != VideoModerationPassLabel { sb.WriteString(fmt.Sprintf("共检测到%d处%s的画面;", summary.GetLabelSum(), FrameLabelDescMap[summary.GetLabel()])) } } for _, summary := range videomoderation.GetAudioSummaries() { if summary.GetLabel() != AudioModerationPassLabel { sb.WriteString(fmt.Sprintf("共检测到%d处%s的音频;", summary.GetLabelSum(), AudioLabelDescMap[summary.GetLabel()])) } } videomoderation.Description = sb.String() case consts.VideoModeration_ServiceFailed: videomoderation.Description = videomoderation.GetRemarks() } return } func buildFrameSummaries(summaries []*green20220302.VideoModerationResultResponseBodyDataFrameResultFrameSummarys) (frameSummaries []*dbstruct.FrameSummary, isPassed bool) { isPassed = true frameSummaries = make([]*dbstruct.FrameSummary, 0) for _, summary := range summaries { if summary != nil { frameSummaries = append(frameSummaries, &dbstruct.FrameSummary{ Label: summary.Label, LabelSum: summary.LabelSum, }) if util.DerefString(summary.Label) != VideoModerationPassLabel { isPassed = false } } } return } func buildFrameDetails(details []*green20220302.VideoModerationResultResponseBodyDataFrameResultFrames) (frameDetails []*dbstruct.FrameDetail) { frameDetails = make([]*dbstruct.FrameDetail, 0) for _, detail := range details { if detail != nil { frameDetail := &dbstruct.FrameDetail{ Offset: detail.Offset, } frameDetail.SetFrameServiceDetails(buildFrameServiceDetails(detail.Results)) frameDetails = append(frameDetails, frameDetail) } } return } func buildFrameServiceDetails(details []*green20220302.VideoModerationResultResponseBodyDataFrameResultFramesResults) (frameServiceDetails []*dbstruct.FrameServiceDetail) { frameServiceDetails = make([]*dbstruct.FrameServiceDetail, 0) for _, detail := range details { if detail != nil { frameServiceDetail := &dbstruct.FrameServiceDetail{ Service: detail.Service, } frameServiceDetail.SetFrameLabelDetails(buildFrameLabelDetails(detail.Result)) frameServiceDetails = append(frameServiceDetails, frameServiceDetail) } } return } func buildFrameLabelDetails(details []*green20220302.VideoModerationResultResponseBodyDataFrameResultFramesResultsResult) (frameLabelDetails []*dbstruct.FrameLabelDetail) { frameLabelDetails = make([]*dbstruct.FrameLabelDetail, 0) for _, detail := range details { if detail != nil { frameLabelDetails = append(frameLabelDetails, &dbstruct.FrameLabelDetail{ Confidence: detail.Confidence, Label: detail.Label, }) } } return } func buildAudioSummaries(summaries []*green20220302.VideoModerationResultResponseBodyDataAudioResultAudioSummarys) (audioSummaries []*dbstruct.AudioSummary, isPassed bool) { isPassed = true audioSummaries = make([]*dbstruct.AudioSummary, 0) for _, summary := range summaries { if summary != nil { audioSummaries = append(audioSummaries, &dbstruct.AudioSummary{ Label: summary.Label, LabelSum: summary.LabelSum, }) if util.DerefString(summary.Label) != AudioModerationPassLabel { isPassed = false } } } return } func buildAudioDetails(details []*green20220302.VideoModerationResultResponseBodyDataAudioResultSliceDetails) (audioDetails []*dbstruct.AudioDetail) { audioDetails = make([]*dbstruct.AudioDetail, 0) for _, detail := range details { if detail != nil { audioDetails = append(audioDetails, &dbstruct.AudioDetail{ EndTime: detail.EndTime, EndTimestamp: detail.EndTimestamp, Extend: detail.Extend, Labels: detail.Labels, RiskTips: detail.RiskTips, RiskWords: detail.RiskWords, Score: detail.Score, StartTime: detail.StartTime, StartTimestamp: detail.StartTimestamp, Text: detail.Text, }) } } return } // 处理task,若task已分片,在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功 func handleTask(task *VideoModerationTaskControlBlock, isPassed bool, videoMod *dbstruct.VideoModeration) (isTaskCompleted bool) { task.VideoModerationTask.AuditedMediaResults = append(task.VideoModerationTask.AuditedMediaResults, isPassed) task.VideoModerationTask.Description = append(task.VideoModerationTask.Description, videoMod.Description) task.IsTaskPassed = task.IsTaskPassed && isPassed task.AuditedFragmentsNum++ return task.AuditedFragmentsNum <= int(task.VideoModerationTask.GetFragmentsNum()) } // 通过task的actionId查出action, 通知该task已完成 func handleTaskAction(task *VideoModerationTaskControlBlock, actionMap map[string]*VideoModerationAction) (isActionCompleted bool, action *VideoModerationAction) { action = actionMap[task.ActionId] action.AuditedTaskNum++ isActionCompleted = action.TaskNum <= action.AuditedTaskNum return } // 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 func finalizeTask(action *VideoModerationAction) (err error) { lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务和它的索引 passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId,失效任务是指那些前驱任务依然失败的失败任务,或之后已有任务成功的失败任务,这些任务已失去实际意义,不可以用来回退 statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效 isActionInPassedStatus := true // 当前动作链是否处于成功中 for i, task := range action.TaskChain { if !task.IsTaskPassed { // 动作链检测到失败任务 if isActionInPassedStatus { // 若动作链处于成功状态,则这是目前为止,最后一个可以用于回退的任务 lastValidTask, lastValidTaskIndex = task, i statuses[i] = consts.VideoModeration_Rollbacked isActionInPassedStatus = false } else { // 否则,若动作链处于失败状态,则该任务已失效 statuses[i] = consts.VideoModeration_Expired expiredTaskIds = append(expiredTaskIds, task.VideoModerationTask.GetId()) } } else { // 若动作链检测到成功任务,则重置动作链任务态,并将最后有效任务标志为失效(它已不再用于回退) passTaskIds = append(passTaskIds, task.VideoModerationTask.GetId()) statuses[i] = consts.VideoModeration_Passed isActionInPassedStatus = true if statuses[lastValidTaskIndex] == consts.VideoModeration_Rollbacked { statuses[lastValidTaskIndex] = consts.VideoModeration_Expired expiredTaskIds = append(expiredTaskIds, action.TaskChain[lastValidTaskIndex].VideoModerationTask.GetId()) } } } // 判定任务链终态,终态非成功,执行回退操作,记录到数据库中 if !action.IsPassed() { if err = executeRollBack(lastValidTask.VideoModerationTask); err != nil { return } } // 更新所有任务状态 if err = updatePassedTasks(passTaskIds); err != nil { return } if err = updateExpiredTasks(expiredTaskIds); err != nil { return } // 终态成功,执行成功后操作 if action.IsPassed() { if err = handleSuccess(action.TaskChain[len(action.TaskChain)-1].VideoModerationTask); err != nil { return } } logger.Info("action statuses : %v", statuses) return } func executeRollBack(lastValidTask *dbstruct.VideoModerationTask) (err error) { ctx := &gin.Context{} lastValidTask.Status = goproto.Int64(consts.VideoModeration_Rollbacked) if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.VideoModerationTaskUpdate_Rollback); err != nil { logger.Error("Roll back taskId:%v fail:%v", lastValidTask.GetId(), err) if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{ VideoModerationTask: &dbstruct.VideoModerationTask{ Id: lastValidTask.Id, Status: goproto.Int64(consts.VideoModeration_Failed), Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"), }, }); err != nil { logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err) } return } if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{ VideoModerationTask: &dbstruct.VideoModerationTask{ Id: lastValidTask.Id, Status: goproto.Int64(consts.VideoModeration_Rollbacked), Remarks: goproto.String("任务审核失败,已将字段回退至之前的版本"), }, }); err != nil { logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err) } return } func handleSuccess(task *dbstruct.VideoModerationTask) (err error) { ctx := &gin.Context{} task.Status = goproto.Int64(consts.VideoModeration_Passed) if err = _DefaultResultHandler.Handle(ctx, task, consts.VideoModerationTaskUpdate_Success); err != nil { logger.Error("Handle success taskId:%v fail:%v", task.GetId(), err) if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{ VideoModerationTask: &dbstruct.VideoModerationTask{ Id: task.Id, Status: goproto.Int64(consts.VideoModeration_Failed), Remarks: goproto.String("任务审核成功,执行成功后操作失败,请联系管理员排查"), }, }); err != nil { logger.Error("_DefaultVideoModerationTask OpUpdate fail: %v\n", err) } return } return } func updatePassedTasks(passTaskIds []string) (err error) { ctx := &gin.Context{} if err = _DefaultVideoModerationTask.OpUpdateByIds(ctx, &video_moderation_task_proto.OpUpdateByIdsReq{ VideoModerationTask: &dbstruct.VideoModerationTask{ Status: goproto.Int64(consts.VideoModeration_Passed), Remarks: goproto.String("任务审核通过"), }, Ids: passTaskIds, }); err != nil { logger.Error("_DefaultVideoModeration OpUpdateByIds fail: %v\n", err) return } return } func updateExpiredTasks(expiredTaskIds []string) (err error) { ctx := &gin.Context{} if err = _DefaultVideoModerationTask.OpUpdateByIds(ctx, &video_moderation_task_proto.OpUpdateByIdsReq{ VideoModerationTask: &dbstruct.VideoModerationTask{ Status: goproto.Int64(consts.VideoModeration_Expired), Remarks: goproto.String("该任务审核未通过,但之前已有对同字段的审核任务失败,或之后有对同字段的审核任务成功,该任务已失效"), }, Ids: expiredTaskIds, }); err != nil { logger.Error("_DefaultVideoModeration OpUpdateByIds fail: %v\n", err) return } return }