package videomoderation import ( "encoding/json" "net/http" "service/api/consts" video_moderation_task_proto "service/api/proto/video_moderation_task/proto" "service/bizcommon/util" "service/dbstruct" "service/library/logger" "service/library/mediafiller" "time" green20220302 "github.com/alibabacloud-go/green-20220302/client" teautils "github.com/alibabacloud-go/tea-utils/v2/service" "github.com/alibabacloud-go/tea/tea" goproto "google.golang.org/protobuf/proto" "github.com/gin-gonic/gin" ) // 刷新批次号 func RefreshBatchId() string { batchId := defaultVideoModerationTaskScheduler.batchId defaultVideoModerationTaskScheduler.batchId = genereteBatchId() return batchId } // 视频审核主逻辑 func Run(batchId string) (successNum int, failNum int, err error) { // 1.查询该批次所有审核任务 videoModerationTasks, err := _DefaultVideoModerationTask.OpList(&gin.Context{}, &video_moderation_task_proto.OpListReq{ BatchId: goproto.String(batchId), Status: goproto.Int64(consts.VideoModeration_Created), Sort: "ct", }) if err != nil { logger.Info("_DefaultVideoModerationTask OpList fail: %v", err) } if len(videoModerationTasks) == 0 { return 0, 0, nil } logger.Info("Video moderation batch started, batchId : %v, task number : %v", batchId, len(videoModerationTasks)) // 2.创建批量任务控制块,获取待审核的视频及其dataId(视频审核表Id),将批量任务控制块暂时保存 btcb, videos, dataIds := createVideoModerationTaskBatchControlBlock(videoModerationTasks) btcb.BatchId = batchId defaultVideoModerationTaskScheduler.btcbMp[btcb.BatchId] = btcb // 3.创建审核请求 reqs := createVideoModerationRequest(videos, dataIds) // 4.调用阿里的服务,接收应答由callback处理 runtime := &teautils.RuntimeOptions{ ReadTimeout: tea.Int(10000), ConnectTimeout: tea.Int(10000), } for i, req := range reqs { _result, err := defaultVideoModerationClient.VideoModerationWithOptions(req, runtime) handleVideoModerationResponse(_result, err, dataIds[i]) } successNum = len(videoModerationTasks) return } func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModerationTask) (batchTaskCtrlBlock *VideoModerationTaskBatchControlBlock, videos []*dbstruct.MediaComponent, dataIds []string) { // 填充媒体,获取url和dataId,创建action信息 videos = make([]*dbstruct.MediaComponent, 0) mediaFillables := make([]mediafiller.MediaFillable, 0) dataIds = make([]string, 0) batchTaskCtrlBlock = &VideoModerationTaskBatchControlBlock{ TaskCtrlBlockMap: make(map[string]*VideoModerationTaskControlBlock), VidmodId2taskMap: make(map[string]*VideoModerationTaskControlBlock), ActionMap: make(map[string]*VideoModerationAction), TaskNum: int64(len(tasks)), FinishedTaskNum: 0, Ct: time.Now().Unix(), } // 以batchId+视频审核表的Id作为dataId for _, task := range tasks { tcb := NewVideoModerationTaskControlBlock(task) // 未分片 if task.GetIsFragmented() == 0 { videos = append(videos, task.AuditedMedia) mediaFillables = append(mediaFillables, task.AuditedMedia) dataIds = append(dataIds, task.GetBatchId()+task.GetVideoModerationId()) batchTaskCtrlBlock.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = tcb batchTaskCtrlBlock.VidmodId2taskMap[task.GetVideoModerationId()] = tcb batchTaskCtrlBlock.RecordAction(tcb) } else { // 已分片 videoIds := task.AuditedMedia.GetVideoIds() videoAuditIds := task.GetVideoModerationFragmentIds() batchTaskCtrlBlock.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = tcb batchTaskCtrlBlock.RecordAction(tcb) for i, videoId := range videoIds { video := &dbstruct.MediaComponent{ VideoIds: util.Int64Slice([]int64{videoId}), } videos = append(videos, video) mediaFillables = append(mediaFillables, video) dataIds = append(dataIds, defaultVideoModerationTaskScheduler.batchId+videoAuditIds[i]) batchTaskCtrlBlock.VidmodId2taskMap[videoAuditIds[i]] = tcb } } } mediafiller.FillList(&gin.Context{}, mediaFillables) return } func createVideoModerationRequest(videos []*dbstruct.MediaComponent, dataIds []string) (requests []*green20220302.VideoModerationRequest) { requests = make([]*green20220302.VideoModerationRequest, 0) for i, video := range videos { serviceParameters, _ := json.Marshal( map[string]interface{}{ "url": video.Videos[0].Urls[0], "dataId": dataIds[i], "callback": _defaultConfig.NotifyUrl, "seed": _defaultConfig.Seed, }, ) request := &green20220302.VideoModerationRequest{ Service: tea.String("videoDetection"), ServiceParameters: tea.String(string(serviceParameters)), } requests = append(requests, request) } logger.Info("本次打包:%v", requests) return } func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error, dataId string) { logger.Info("Receive the response from VideoModerationResponse: %v", resp.String()) isSuccess := true code := int32(http.StatusOK) msg := "" // 针对错误和发送失败的情况,写入信息,移交handler处理 if err != nil { if _t, ok := err.(*tea.SDKError); ok { logger.Error("VideoModerationWithOptions fail, errinfo: %v", util.DerefString(_t.Data)) isSuccess = false code = http.StatusInternalServerError msg = err.Error() } } else if util.DerefInt32(resp.StatusCode) != http.StatusOK { logger.Error("response not success. status: %d", util.DerefInt32(resp.StatusCode)) isSuccess = false code = util.DerefInt32(resp.StatusCode) msg = util.DerefString(resp.Body.Message) } if !isSuccess { rErr := handleVideoModerationResultResponseBody(&green20220302.VideoModerationResultResponseBody{ Code: goproto.Int32(code), Message: goproto.String(msg), Data: &green20220302.VideoModerationResultResponseBodyData{ DataId: goproto.String(dataId), }, }) if rErr != nil { logger.Error("handleVideoModerationResultResponseBody fail, err: %v", rErr) } } }