by Robin at 20240427

This commit is contained in:
Leufolium 2024-04-27 11:14:44 +08:00
parent b53a96d3bf
commit edb5caf001
9 changed files with 108 additions and 84 deletions

View File

@ -18,9 +18,9 @@ func VideoModerationCallback(ctx *gin.Context) {
return
}
err = videomoderation.HandleVideoModerationResult(req.Content)
err = videomoderation.HandleVideoModerationContent(req.Content)
if err != nil {
logger.Error("HandleVideoModerationResult fail, req: %v, err: %v", util.ToJson(req), err)
logger.Error("HandleVideoModerationContent fail, req: %v, err: %v", util.ToJson(req), err)
return
}

View File

@ -61,6 +61,7 @@ func (s *CronService) Init(c any) (err error) {
exec.RegTask("clear_wrong_pswd_times", s.ClearWrongPswdTimes)
exec.RegTask("video_moderation_batch", s.VideoModerationBatch)
exec.RegTask("video_moderation_batch_his", s.VideoModerationBatchHis)
exec.RegTask("clear_expired_btcb", s.ClearExpiredBtcb)
exec.LogHandler(customLogHandle)
//注册任务handler

View File

@ -482,3 +482,12 @@ func (s *CronService) VideoModerationBatchHis(ctx context.Context, param *xxl.Ru
logger.Info("Video moderation batch ends...")
return handleMsg.String()
}
func (s *CronService) ClearExpiredBtcb(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
videomoderation.ClearExpiredBtcb()
logger.Info("Clear expired btcb ends...")
return "Clear expired btcb ends..."
}

View File

@ -95,7 +95,7 @@ textaudit:
video_moderation:
access_key_id: "LTAI5tQBGFS9BNiTHdbAAM74"
access_key_secret: "vsV42N0ZCtJTQNwFLheCaV8oQn69hQ"
notify_url: "https://62375b61.r7.cpolar.top/ext/video_moderation/callback"
notify_url: "https://api.wishpal.cn/ext/video_moderation/callback"
ali_acct_id: "1501583627133461"
seed: "23XSkaZsMTz3xdnKtTmx229ZaJ3VS7e"

View File

@ -98,3 +98,5 @@ var AudioLabelDescMap = map[string]string{
"nontalk": "静音音频",
"C_customized": "用户库命中",
}
const SecElapsedBeforeTaskExpires = 600 // 审核任务10分钟后认为过期

View File

@ -14,6 +14,9 @@ type VideoModerationTaskBatchControlBlock struct {
TaskCtrlBlockMap map[string]*VideoModerationTaskControlBlock // 视频审核任务表id->任务控制块map
VidmodId2taskMap map[string]*VideoModerationTaskControlBlock // 视频审核表id->任务控制块map
ActionMap map[string]*VideoModerationAction // 动作Id号-action的map
Ct int64 // 创建时间戳
TaskNum int64 // 任务数量
FinishedTaskNum int64 // 已完成任务数量
}
func (ctrlBlock *VideoModerationTaskBatchControlBlock) RecordAction(taskCtrlBlock *VideoModerationTaskControlBlock) {
@ -30,6 +33,14 @@ func (ctrlBlock *VideoModerationTaskBatchControlBlock) RecordAction(taskCtrlBloc
ctrlBlock.ActionMap[taskCtrlBlock.ActionId].Record(taskCtrlBlock)
}
func (ctrlBlock *VideoModerationTaskBatchControlBlock) IsBatchFinished() bool {
return ctrlBlock.TaskNum <= ctrlBlock.FinishedTaskNum
}
func (ctrlBlock *VideoModerationTaskBatchControlBlock) FinishATask() {
ctrlBlock.FinishedTaskNum++
}
// 视频审核任务控制块
// ActionId设计初衷由于视频审核是定时任务触发的批量作业如果在一次作业间隔有针对同一个视频媒体的多次更新则会提交关于它的多次审核需要保证数据一致性
type VideoModerationTaskControlBlock struct {

View File

@ -17,13 +17,8 @@ import (
goproto "google.golang.org/protobuf/proto"
)
func HandleVideoModerationResult(content string) (err error) {
isTaskCompleted := false
isActionCompleted := false
action := &VideoModerationAction{}
// 1.获取ResponseBody解析出batchId和视频审核表id
func HandleVideoModerationContent(content string) (err error) {
// 获取ResponseBody解析出batchId和视频审核表id
logger.Info("Unmarshaling ResponseBody...")
result := &green20220302.VideoModerationResultResponseBody{}
err = json.Unmarshal([]byte(content), result)
@ -32,42 +27,65 @@ func HandleVideoModerationResult(content string) (err error) {
return
}
// 2.立即在视频审核表更新该次审核结果
err = handleVideoModerationResultResponseBody(result)
return
}
func handleVideoModerationResultResponseBody(result *green20220302.VideoModerationResultResponseBody) (err error) {
isTaskCompleted := false
isActionCompleted := false
action := &VideoModerationAction{}
// 1.立即在视频审核表更新该次审核结果
logger.Info("Handling its moderation record...")
isPassed, videomoderation, err := handleVideoModeration(result)
if err != nil {
logger.Error("handleVideoModeration fail: %v", err)
}
// 3.通过batchId从缓存的批次任务控制块里获取信息
// 2.通过batchId从缓存的批次任务控制块里获取信息
logger.Info("Retriving batch task control block...")
btcb := getBtcb(videomoderation.GetBatchId())
if btcb == nil {
btcb, ok := getBtcb(videomoderation.GetBatchId())
if !ok || btcb == nil {
// 若任务控制块已不存在,则认为该审核任务已过期
logger.Info("video moderation batch task of batch_id %v has expired and been removed, you may want to rerun it from xxl-job...")
return
}
// 4.取出task
// 3.取出task
logger.Info("Retriving its moderation task...")
tcb := btcb.VidmodId2taskMap[videomoderation.GetId()]
tcb, ok := btcb.VidmodId2taskMap[videomoderation.GetId()]
if !ok || tcb == nil {
logger.Info("video moderation task of id %v has expired and been removed, this is abnormal, you may want to check your code...")
return
}
// 5.在task中记录本分片结果并累积已到达的分片数直到所有分片到达决定该任务是否成功(非分片任务分片数为1)
// 4.在task中记录本分片结果并累积已到达的分片数直到所有分片到达决定该任务是否成功(非分片任务分片数为1)
logger.Info("Recording it to task...")
isTaskCompleted = handleTask(tcb, isPassed, videomoderation)
// 6.通过task的actionId去actionId-[]*task的map查出本批次对该字段的动作链更新其中关于自己的状态
// 5.通过task的actionId去actionId-[]*task的map查出本批次对该字段的动作链更新其中关于自己的状态
logger.Info("Recording the task result...")
if isTaskCompleted {
isActionCompleted, action = handleTaskAction(tcb, btcb.ActionMap)
}
// 7.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
// 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
logger.Info("Recording the action result...")
if isActionCompleted {
if err = finalizeTask(action); err != nil {
logger.Error("finalizeTask fail: %v", err)
}
btcb.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = nil
logger.Info("video moderation task of id %v has finished...", tcb.VideoModerationTask.GetId())
// 如果批次任务已经完成从map中移除该批次任务控制块
batchId := btcb.BatchId
btcb.FinishATask()
if btcb.IsBatchFinished() {
removeBtcb(btcb.BatchId)
}
logger.Info("video moderation batch task of batch_id %v has finished and been removed, all referenced request will be regarded as expired...", batchId)
}
return nil
@ -127,12 +145,12 @@ func handleVideoModeration(result *green20220302.VideoModerationResultResponseBo
sb := &strings.Builder{}
for _, summary := range videomoderation.GetFrameSummaries() {
if summary.GetLabel() != VideoModerationPassLabel {
sb.WriteString(fmt.Sprintf("共检测到%d处%s的画面", summary.GetLabelSum(), FrameLabelDescMap[summary.GetLabel()]))
sb.WriteString(fmt.Sprintf("共检测到%d处%s的画面", summary.GetLabelSum(), FrameLabelDescMap[summary.GetLabel()]))
}
}
for _, summary := range videomoderation.GetAudioSummaries() {
if summary.GetLabel() != AudioModerationPassLabel {
sb.WriteString(fmt.Sprintf("共检测到%d处%s的音频", summary.GetLabelSum(), AudioLabelDescMap[summary.GetLabel()]))
sb.WriteString(fmt.Sprintf("共检测到%d处%s的音频", summary.GetLabelSum(), AudioLabelDescMap[summary.GetLabel()]))
}
}
videomoderation.Description = sb.String()
@ -379,59 +397,3 @@ func updateExpiredTasks(expiredTaskIds []string) (err error) {
}
return
}
// func handleVideoModerationError(dataId string, batchId string, msg string) (err error) {
// ctx := &gin.Context{}
// if err = _DefaultVideoModeration.OpUpdate(ctx, &video_moderation_proto.OpUpdateReq{
// VideoModeration: &dbstruct.VideoModeration{
// Id: goproto.String(dataId),
// Status: goproto.Int64(consts.VideoModeration_ServiceFailed),
// Remarks: goproto.String("批次任务失败原因详见task表"),
// },
// }); err != nil {
// logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err)
// return
// }
// // 先看是否已经批次失败
// btcb := getBtcb(batchId)
// if btcb == nil {
// logger.Error("This video moderation task batch: %v has expired, you may want to rerun it", batchId)
// return
// }
// tcb := btcb.VidmodId2taskMap[dataId]
// if !tcb.Failed {
// tcb.Failed = true
// task := tcb.VideoModerationTask
// if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{
// VideoModerationTask: &dbstruct.VideoModerationTask{
// Id: tcb.VideoModerationTask.Id,
// Status: goproto.Int64(consts.VideoModeration_ServiceFailed),
// Remarks: goproto.String(msg),
// },
// }); err != nil {
// logger.Error("_DefaultVideoModerationTask OpUpdate fail: %v\n", err)
// return
// }
// task.Status = goproto.Int64(consts.VideoModeration_ServiceFailed)
// if err = _DefaultResultHandler.Handle(ctx, task, consts.VideoModerationTaskUpdate_Rollback); err != nil {
// if err = _DefaultVideoModerationTask.OpUpdate(ctx, &video_moderation_task_proto.OpUpdateReq{
// VideoModerationTask: &dbstruct.VideoModerationTask{
// Id: task.Id,
// Status: goproto.Int64(consts.VideoModeration_Failed),
// Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
// },
// }); err != nil {
// logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err)
// }
// }
// btcb.TaskCtrlBlockMap[task.GetId()] = nil
// }
// return
// }

View File

@ -3,6 +3,7 @@ package videomoderation
import (
"fmt"
"service/library/configcenter"
"service/library/logger"
"time"
)
@ -33,11 +34,26 @@ func genereteBatchId() string {
}
// 获取批次审核任务控制块
func getBtcb(batchId string) *VideoModerationTaskBatchControlBlock {
return defaultVideoModerationTaskScheduler.btcbMp[batchId]
func getBtcb(batchId string) (*VideoModerationTaskBatchControlBlock, bool) {
for k, v := range defaultVideoModerationTaskScheduler.btcbMp {
logger.Info("batch_id: %v, btcb: %v", k, v)
}
logger.Info("now batch_id: %v", batchId)
btcb, ok := defaultVideoModerationTaskScheduler.btcbMp[batchId]
return btcb, ok
}
// 移除批次审核任务控制块
func removeBtcb(batchId string) {
defaultVideoModerationTaskScheduler.btcbMp[batchId] = nil
delete(defaultVideoModerationTaskScheduler.btcbMp, batchId)
}
// 清理过期的批次审核任务控制块
func ClearExpiredBtcb() {
nowTime := time.Now().Unix()
for _, v := range defaultVideoModerationTaskScheduler.btcbMp {
if nowTime-v.Ct > SecElapsedBeforeTaskExpires {
delete(defaultVideoModerationTaskScheduler.btcbMp, v.BatchId)
}
}
}

View File

@ -9,6 +9,7 @@ import (
"service/dbstruct"
"service/library/logger"
"service/library/mediafiller"
"time"
green20220302 "github.com/alibabacloud-go/green-20220302/client"
@ -76,6 +77,9 @@ func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModeratio
TaskCtrlBlockMap: make(map[string]*VideoModerationTaskControlBlock),
VidmodId2taskMap: make(map[string]*VideoModerationTaskControlBlock),
ActionMap: make(map[string]*VideoModerationAction),
TaskNum: int64(len(tasks)),
FinishedTaskNum: 0,
Ct: time.Now().Unix(),
}
// 以batchId+视频审核表的Id作为dataId
@ -85,7 +89,7 @@ func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModeratio
if task.GetIsFragmented() == 0 {
videos = append(videos, task.AuditedMedia)
mediaFillables = append(mediaFillables, task.AuditedMedia)
dataIds = append(dataIds, defaultVideoModerationTaskScheduler.batchId+task.GetVideoModerationId())
dataIds = append(dataIds, task.GetBatchId()+task.GetVideoModerationId())
batchTaskCtrlBlock.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = tcb
batchTaskCtrlBlock.VidmodId2taskMap[task.GetVideoModerationId()] = tcb
@ -135,22 +139,41 @@ func createVideoModerationRequest(videos []*dbstruct.MediaComponent, dataIds []s
return
}
func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error, dataId string, batchId string) {
func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error, dataId string) {
logger.Info("Receive the response from VideoModerationResponse: %v", resp.String())
isSuccess := true
code := int32(http.StatusOK)
msg := ""
// 针对错误和发送失败的情况写入信息移交handler处理
if err != nil {
if _t, ok := err.(*tea.SDKError); ok {
logger.Error("VideoModerationWithOptions fail, errinfo: %v", util.DerefString(_t.Data))
isSuccess = false
code = http.StatusInternalServerError
msg = err.Error()
}
} else if util.DerefInt32(resp.StatusCode) != http.StatusOK {
logger.Error("response not success. status: %d", util.DerefInt32(resp.StatusCode))
isSuccess = false
code = util.DerefInt32(resp.StatusCode)
msg = util.DerefString(resp.Body.Message)
}
if !isSuccess {
//handleVideoModerationError(dataId, batchId, msg)
rErr := handleVideoModerationResultResponseBody(&green20220302.VideoModerationResultResponseBody{
Code: goproto.Int32(code),
Message: goproto.String(msg),
Data: &green20220302.VideoModerationResultResponseBodyData{
DataId: goproto.String(dataId),
},
})
if rErr != nil {
logger.Error("handleVideoModerationResultResponseBody fail, err: %v", rErr)
}
}
}