service/library/contentaudit/video_moderation/handler.go

424 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package videomoderation
import (
"encoding/json"
"fmt"
"net/http"
"service/api/consts"
video_moderation_proto "service/api/proto/video_moderation/proto"
video_moderation_task_proto "service/api/proto/video_moderation_task/proto"
"service/bizcommon/util"
"service/dbstruct"
"service/library/logger"
"strings"
green20220302 "github.com/alibabacloud-go/green-20220302/client"
"github.com/gin-gonic/gin"
goproto "google.golang.org/protobuf/proto"
)
func HandleVideoModerationContent(content string) (batchId string, isBatchFinished bool, err error) {
// 获取ResponseBody解析出batchId和视频审核表id
logger.Info("Unmarshaling ResponseBody...")
result := &green20220302.VideoModerationResultResponseBody{}
err = json.Unmarshal([]byte(content), result)
if err != nil {
logger.Error("json unmarshal failed, err: %v", err)
return
}
batchId, isBatchFinished, err = handleVideoModerationResultResponseBody(result)
return
}
func handleVideoModerationResultResponseBody(result *green20220302.VideoModerationResultResponseBody) (batchId string, isBatchFinished bool, err error) {
batchId = ""
isBatchFinished = false
isTaskCompleted := false
isActionCompleted := false
action := &VideoModerationAction{}
// 1.立即在视频审核表更新该次审核结果
logger.Info("Handling its moderation record...")
isPassed, videomoderation, err := handleVideoModeration(result)
if err != nil {
logger.Error("handleVideoModeration fail: %v", err)
}
// 2.通过batchId从缓存的批次任务控制块里获取信息
logger.Info("Retriving batch task control block...")
btcb, ok := getBtcb(videomoderation.GetBatchId())
if !ok || btcb == nil {
// 若任务控制块已不存在,则认为该审核任务已过期
logger.Info("video moderation batch task of batch_id %v has expired and been removed, you may want to rerun it from xxl-job...")
return
}
// 3.取出task
logger.Info("Retriving its moderation task...")
tcb, ok := btcb.VidmodId2taskMap[videomoderation.GetId()]
if !ok || tcb == nil {
logger.Info("video moderation task of id %v has expired and been removed, this is abnormal, you may want to check your code...")
return
}
// 4.在task中记录本分片结果并累积已到达的分片数直到所有分片到达决定该任务是否成功(非分片任务分片数为1)
logger.Info("Recording it to task...")
isTaskCompleted = handleTask(tcb, isPassed, videomoderation)
// 5.通过task的actionId去actionId-[]*task的map查出本批次对该字段的动作链更新其中关于自己的状态
logger.Info("Recording the task result...")
if isTaskCompleted {
isActionCompleted, action = handleTaskAction(tcb, btcb.ActionMap)
}
// 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
logger.Info("Recording the action result...")
if isActionCompleted {
if err = finalizeTask(action); err != nil {
logger.Error("finalizeTask fail: %v", err)
}
logger.Info("video moderation task of id %v has finished...", tcb.VideoModerationTask.GetId())
// 如果批次任务已经完成从map中移除该批次任务控制块
batchId = btcb.BatchId
btcb.FinishATask()
if btcb.IsBatchFinished() {
isBatchFinished = true
removeBtcb(btcb.BatchId)
}
logger.Info("video moderation batch task of batch_id %v has finished and been removed, all referenced request will be regarded as expired...", batchId)
}
return
}
func handleVideoModeration(result *green20220302.VideoModerationResultResponseBody) (isPassed bool, videomoderation *dbstruct.VideoModeration, err error) {
dataId := util.DerefString(result.Data.DataId)
batchId := dataId[:14]
videomoderationId := dataId[14:]
isPassed = true
videomoderation = &dbstruct.VideoModeration{
Id: goproto.String(videomoderationId),
}
if util.DerefInt32(result.Code) != http.StatusOK {
// 阿里云服务失败
logger.Error("response not success. status: %d", util.DerefInt32(result.Code))
isPassed = false
videomoderation.Status = goproto.Int64(consts.VideoModeration_ServiceFailed)
videomoderation.Remarks = goproto.String(fmt.Sprintf("任务失败:%v", util.DerefString(result.Message)))
} else {
// 阿里云服务成功
frameResult := result.Data.FrameResult
audioResult := result.Data.AudioResult
// 解析信息
isFramesPassed := true
isAudioPassed := true
if frameResult != nil {
var frameSummaries []*dbstruct.FrameSummary
frameSummaries, isFramesPassed = buildFrameSummaries(frameResult.FrameSummarys)
frameDetails := buildFrameDetails(frameResult.Frames)
videomoderation.SetFrameSummaries(frameSummaries).SetFrameDetails(frameDetails)
videomoderation.FrameNum = frameResult.FrameNum
}
if audioResult != nil {
var audioSummaries []*dbstruct.AudioSummary
audioSummaries, isAudioPassed = buildAudioSummaries(audioResult.AudioSummarys)
audioDetails := buildAudioDetails(audioResult.SliceDetails)
videomoderation.SetAudioSummaries(audioSummaries).SetAudioDetails(audioDetails)
}
// 判定检测结果
if isFramesPassed && isAudioPassed {
videomoderation.Status = goproto.Int64(consts.VideoModeration_Passed)
} else {
videomoderation.Status = goproto.Int64(consts.VideoModeration_Rejected)
isPassed = false
}
}
if err = _DefaultVideoModeration.OpUpdate(&gin.Context{}, &video_moderation_proto.OpUpdateReq{
VideoModeration: videomoderation,
}); err != nil {
logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err)
return
}
videomoderation.BatchId = goproto.String(batchId)
switch videomoderation.GetStatus() {
case consts.VideoModeration_Rejected:
sb := &strings.Builder{}
for _, summary := range videomoderation.GetFrameSummaries() {
if summary.GetLabel() != VideoModerationPassLabel {
sb.WriteString(fmt.Sprintf("共检测到%d处%s的画面", summary.GetLabelSum(), FrameLabelDescMap[summary.GetLabel()]))
}
}
for _, summary := range videomoderation.GetAudioSummaries() {
if summary.GetLabel() != AudioModerationPassLabel {
sb.WriteString(fmt.Sprintf("共检测到%d处%s的音频", summary.GetLabelSum(), AudioLabelDescMap[summary.GetLabel()]))
}
}
videomoderation.Description = sb.String()
case consts.VideoModeration_ServiceFailed:
videomoderation.Description = videomoderation.GetRemarks()
}
return
}
func buildFrameSummaries(summaries []*green20220302.VideoModerationResultResponseBodyDataFrameResultFrameSummarys) (frameSummaries []*dbstruct.FrameSummary, isPassed bool) {
isPassed = true
frameSummaries = make([]*dbstruct.FrameSummary, 0)
for _, summary := range summaries {
if summary != nil {
frameSummaries = append(frameSummaries, &dbstruct.FrameSummary{
Label: summary.Label,
LabelSum: summary.LabelSum,
})
if util.DerefString(summary.Label) != VideoModerationPassLabel {
isPassed = false
}
}
}
return
}
func buildFrameDetails(details []*green20220302.VideoModerationResultResponseBodyDataFrameResultFrames) (frameDetails []*dbstruct.FrameDetail) {
frameDetails = make([]*dbstruct.FrameDetail, 0)
for _, detail := range details {
if detail != nil {
frameDetail := &dbstruct.FrameDetail{
Offset: detail.Offset,
}
frameDetail.SetFrameServiceDetails(buildFrameServiceDetails(detail.Results))
frameDetails = append(frameDetails, frameDetail)
}
}
return
}
func buildFrameServiceDetails(details []*green20220302.VideoModerationResultResponseBodyDataFrameResultFramesResults) (frameServiceDetails []*dbstruct.FrameServiceDetail) {
frameServiceDetails = make([]*dbstruct.FrameServiceDetail, 0)
for _, detail := range details {
if detail != nil {
frameServiceDetail := &dbstruct.FrameServiceDetail{
Service: detail.Service,
}
frameServiceDetail.SetFrameLabelDetails(buildFrameLabelDetails(detail.Result))
frameServiceDetails = append(frameServiceDetails, frameServiceDetail)
}
}
return
}
func buildFrameLabelDetails(details []*green20220302.VideoModerationResultResponseBodyDataFrameResultFramesResultsResult) (frameLabelDetails []*dbstruct.FrameLabelDetail) {
frameLabelDetails = make([]*dbstruct.FrameLabelDetail, 0)
for _, detail := range details {
if detail != nil {
frameLabelDetails = append(frameLabelDetails, &dbstruct.FrameLabelDetail{
Confidence: detail.Confidence,
Label: detail.Label,
})
}
}
return
}
func buildAudioSummaries(summaries []*green20220302.VideoModerationResultResponseBodyDataAudioResultAudioSummarys) (audioSummaries []*dbstruct.AudioSummary, isPassed bool) {
isPassed = true
audioSummaries = make([]*dbstruct.AudioSummary, 0)
for _, summary := range summaries {
if summary != nil {
audioSummaries = append(audioSummaries, &dbstruct.AudioSummary{
Label: summary.Label,
LabelSum: summary.LabelSum,
})
if util.DerefString(summary.Label) != AudioModerationPassLabel {
isPassed = false
}
}
}
return
}
func buildAudioDetails(details []*green20220302.VideoModerationResultResponseBodyDataAudioResultSliceDetails) (audioDetails []*dbstruct.AudioDetail) {
audioDetails = make([]*dbstruct.AudioDetail, 0)
for _, detail := range details {
if detail != nil {
audioDetails = append(audioDetails, &dbstruct.AudioDetail{
EndTime: detail.EndTime,
EndTimestamp: detail.EndTimestamp,
Extend: detail.Extend,
Labels: detail.Labels,
RiskTips: detail.RiskTips,
RiskWords: detail.RiskWords,
Score: detail.Score,
StartTime: detail.StartTime,
StartTimestamp: detail.StartTimestamp,
Text: detail.Text,
})
}
}
return
}
// 处理task若task已分片在task中记录本分片结果并累积已到达的分片数直到所有分片到达决定该任务是否成功
func handleTask(task *VideoModerationTaskControlBlock, isPassed bool, videoMod *dbstruct.VideoModeration) (isTaskCompleted bool) {
task.VideoModerationTask.AuditedMediaResults = append(task.VideoModerationTask.AuditedMediaResults, isPassed)
task.VideoModerationTask.Description = append(task.VideoModerationTask.Description, videoMod.Description)
task.IsTaskPassed = task.IsTaskPassed && isPassed
task.AuditedFragmentsNum++
return task.AuditedFragmentsNum <= int(task.VideoModerationTask.GetFragmentsNum())
}
// 通过task的actionId查出action, 通知该task已完成
func handleTaskAction(task *VideoModerationTaskControlBlock, actionMap map[string]*VideoModerationAction) (isActionCompleted bool, action *VideoModerationAction) {
action = actionMap[task.ActionId]
action.AuditedTaskNum++
isActionCompleted = action.TaskNum <= action.AuditedTaskNum
return
}
// 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
func finalizeTask(action *VideoModerationAction) (err error) {
lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务和它的索引
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId失效任务是指那些前驱任务依然失败的失败任务或之后已有任务成功的失败任务这些任务已失去实际意义不可以用来回退
statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效
isActionInPassedStatus := true // 当前动作链是否处于成功中
for i, task := range action.TaskChain {
if !task.IsTaskPassed { // 动作链检测到失败任务
if isActionInPassedStatus { // 若动作链处于成功状态,则这是目前为止,最后一个可以用于回退的任务
lastValidTask, lastValidTaskIndex = task, i
statuses[i] = consts.VideoModeration_Rollbacked
isActionInPassedStatus = false
} else { // 否则,若动作链处于失败状态,则该任务已失效
statuses[i] = consts.VideoModeration_Expired
expiredTaskIds = append(expiredTaskIds, task.VideoModerationTask.GetId())
}
} else { // 若动作链检测到成功任务,则重置动作链任务态,并将最后有效任务标志为失效(它已不再用于回退)
passTaskIds = append(passTaskIds, task.VideoModerationTask.GetId())
statuses[i] = consts.VideoModeration_Passed
isActionInPassedStatus = true
if statuses[lastValidTaskIndex] == consts.VideoModeration_Rollbacked {
statuses[lastValidTaskIndex] = consts.VideoModeration_Expired
expiredTaskIds = append(expiredTaskIds, action.TaskChain[lastValidTaskIndex].VideoModerationTask.GetId())
}
}
}
// 判定任务链终态,终态非成功,执行回退操作,记录到数据库中
if !action.IsPassed() {
if err = executeRollBack(lastValidTask.VideoModerationTask); err != nil {
return
}
}
// 更新所有任务状态
if err = updatePassedTasks(passTaskIds); err != nil {
return
}
if err = updateExpiredTasks(expiredTaskIds); err != nil {
return
}
// 终态成功,执行成功后操作
if action.IsPassed() {
if err = handleSuccess(action.TaskChain[len(action.TaskChain)-1].VideoModerationTask); err != nil {
return
}
}
logger.Info("action statuses : %v", statuses)
return
}
func executeRollBack(lastValidTask *dbstruct.VideoModerationTask) (err error) {
ctx := &gin.Context{}
lastValidTask.Status = goproto.Int64(consts.VideoModeration_Rollbacked)
if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.VideoModerationTaskUpdate_Rollback); err != nil {
logger.Error("Roll back taskId:%v fail:%v", lastValidTask.GetId(), err)
if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{
VideoModerationTask: &dbstruct.VideoModerationTask{
Id: lastValidTask.Id,
Status: goproto.Int64(consts.VideoModeration_Failed),
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
},
}); err != nil {
logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err)
}
return
}
if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{
VideoModerationTask: &dbstruct.VideoModerationTask{
Id: lastValidTask.Id,
Status: goproto.Int64(consts.VideoModeration_Rollbacked),
Remarks: goproto.String("任务审核失败,已将字段回退至之前的版本"),
},
}); err != nil {
logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err)
}
return
}
func handleSuccess(task *dbstruct.VideoModerationTask) (err error) {
ctx := &gin.Context{}
task.Status = goproto.Int64(consts.VideoModeration_Passed)
if err = _DefaultResultHandler.Handle(ctx, task, consts.VideoModerationTaskUpdate_Success); err != nil {
logger.Error("Handle success taskId:%v fail:%v", task.GetId(), err)
if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{
VideoModerationTask: &dbstruct.VideoModerationTask{
Id: task.Id,
Status: goproto.Int64(consts.VideoModeration_Failed),
Remarks: goproto.String("任务审核成功,执行成功后操作失败,请联系管理员排查"),
},
}); err != nil {
logger.Error("_DefaultVideoModerationTask OpUpdate fail: %v\n", err)
}
return
}
return
}
func updatePassedTasks(passTaskIds []string) (err error) {
ctx := &gin.Context{}
if err = _DefaultVideoModerationTask.OpUpdateByIds(ctx, &video_moderation_task_proto.OpUpdateByIdsReq{
VideoModerationTask: &dbstruct.VideoModerationTask{
Status: goproto.Int64(consts.VideoModeration_Passed),
Remarks: goproto.String("任务审核通过"),
},
Ids: passTaskIds,
}); err != nil {
logger.Error("_DefaultVideoModeration OpUpdateByIds fail: %v\n", err)
return
}
return
}
func updateExpiredTasks(expiredTaskIds []string) (err error) {
ctx := &gin.Context{}
if err = _DefaultVideoModerationTask.OpUpdateByIds(ctx, &video_moderation_task_proto.OpUpdateByIdsReq{
VideoModerationTask: &dbstruct.VideoModerationTask{
Status: goproto.Int64(consts.VideoModeration_Expired),
Remarks: goproto.String("该任务审核未通过,但之前已有对同字段的审核任务失败,或之后有对同字段的审核任务成功,该任务已失效"),
},
Ids: expiredTaskIds,
}); err != nil {
logger.Error("_DefaultVideoModeration OpUpdateByIds fail: %v\n", err)
return
}
return
}