service/library/contentaudit/video_moderation/video_moderation.go

183 lines
6.1 KiB
Go
Raw Normal View History

2024-04-26 10:46:37 +08:00
package videomoderation
import (
"encoding/json"
"net/http"
"service/api/consts"
video_moderation_task_proto "service/api/proto/video_moderation_task/proto"
"service/bizcommon/util"
"service/dbstruct"
"service/library/logger"
"service/library/mediafiller"
2024-04-27 11:14:44 +08:00
"time"
2024-04-26 10:46:37 +08:00
green20220302 "github.com/alibabacloud-go/green-20220302/client"
teautils "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
goproto "google.golang.org/protobuf/proto"
"github.com/gin-gonic/gin"
)
// 刷新批次号
func RefreshBatchId() (string, error) {
batchId, err := _DefaultContentAuditRTI.GetAndUpdateVideoModerationBatchId(&gin.Context{}, genereteBatchId())
if err != nil {
logger.Info("_DefaultContentAuditRTI GetAndUpdateVideoModerationBatchId fail: %v", err)
return "", err
}
return batchId, nil
2024-04-26 10:46:37 +08:00
}
// 视频审核主逻辑
func Run(batchId string) (successNum int, failNum int, err error) {
// 1.查询该批次所有审核任务
videoModerationTasks, err := _DefaultVideoModerationTask.OpList(&gin.Context{}, &video_moderation_task_proto.OpListReq{
BatchId: goproto.String(batchId),
Status: goproto.Int64(consts.VideoModeration_Created),
Sort: "ct",
})
if err != nil {
logger.Info("_DefaultVideoModerationTask OpList fail: %v", err)
}
if len(videoModerationTasks) == 0 {
return 0, 0, nil
}
logger.Info("Video moderation batch started, batchId : %v, task number : %v", batchId, len(videoModerationTasks))
// 2.创建批量任务控制块获取待审核的视频及其dataId(视频审核表Id),将批量任务控制块暂时保存
btcb, videos, dataIds := createVideoModerationTaskBatchControlBlock(videoModerationTasks)
2024-04-27 00:18:31 +08:00
btcb.BatchId = batchId
2024-04-26 10:46:37 +08:00
defaultVideoModerationTaskScheduler.btcbMp[btcb.BatchId] = btcb
// 3.创建审核请求
reqs := createVideoModerationRequest(videos, dataIds)
// 4.调用阿里的服务接收应答由callback处理
runtime := &teautils.RuntimeOptions{
ReadTimeout: tea.Int(10000),
ConnectTimeout: tea.Int(10000),
}
2024-04-27 00:18:31 +08:00
for i, req := range reqs {
2024-04-26 10:46:37 +08:00
_result, err := defaultVideoModerationClient.VideoModerationWithOptions(req, runtime)
2024-04-27 00:18:31 +08:00
handleVideoModerationResponse(_result, err, dataIds[i])
2024-04-26 10:46:37 +08:00
}
successNum = len(videoModerationTasks)
return
}
func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModerationTask) (batchTaskCtrlBlock *VideoModerationTaskBatchControlBlock, videos []*dbstruct.MediaComponent, dataIds []string) {
// 填充媒体获取url和dataId创建action信息
videos = make([]*dbstruct.MediaComponent, 0)
mediaFillables := make([]mediafiller.MediaFillable, 0)
dataIds = make([]string, 0)
batchTaskCtrlBlock = &VideoModerationTaskBatchControlBlock{
TaskCtrlBlockMap: make(map[string]*VideoModerationTaskControlBlock),
VidmodId2taskMap: make(map[string]*VideoModerationTaskControlBlock),
ActionMap: make(map[string]*VideoModerationAction),
2024-04-27 11:14:44 +08:00
TaskNum: int64(len(tasks)),
FinishedTaskNum: 0,
Ct: time.Now().Unix(),
2024-04-26 10:46:37 +08:00
}
// 以batchId+视频审核表的Id作为dataId
for _, task := range tasks {
tcb := NewVideoModerationTaskControlBlock(task)
// 未分片
if task.GetIsFragmented() == 0 {
videos = append(videos, task.AuditedMedia)
mediaFillables = append(mediaFillables, task.AuditedMedia)
2024-04-27 11:14:44 +08:00
dataIds = append(dataIds, task.GetBatchId()+task.GetVideoModerationId())
2024-04-26 10:46:37 +08:00
batchTaskCtrlBlock.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = tcb
batchTaskCtrlBlock.VidmodId2taskMap[task.GetVideoModerationId()] = tcb
batchTaskCtrlBlock.RecordAction(tcb)
} else { // 已分片
videoIds := task.AuditedMedia.GetVideoIds()
videoAuditIds := task.GetVideoModerationFragmentIds()
batchTaskCtrlBlock.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = tcb
batchTaskCtrlBlock.RecordAction(tcb)
for i, videoId := range videoIds {
video := &dbstruct.MediaComponent{
VideoIds: util.Int64Slice([]int64{videoId}),
}
videos = append(videos, video)
mediaFillables = append(mediaFillables, video)
dataIds = append(dataIds, task.GetBatchId()+videoAuditIds[i])
2024-04-26 10:46:37 +08:00
batchTaskCtrlBlock.VidmodId2taskMap[videoAuditIds[i]] = tcb
}
}
}
mediafiller.FillList(&gin.Context{}, mediaFillables)
return
}
func createVideoModerationRequest(videos []*dbstruct.MediaComponent, dataIds []string) (requests []*green20220302.VideoModerationRequest) {
requests = make([]*green20220302.VideoModerationRequest, 0)
for i, video := range videos {
serviceParameters, _ := json.Marshal(
map[string]interface{}{
"url": video.Videos[0].Urls[0],
"dataId": dataIds[i],
"callback": _defaultConfig.NotifyUrl,
"seed": _defaultConfig.Seed,
},
)
request := &green20220302.VideoModerationRequest{
Service: tea.String("videoDetection"),
ServiceParameters: tea.String(string(serviceParameters)),
}
requests = append(requests, request)
}
logger.Info("本次打包:%v", requests)
return
}
2024-04-27 11:14:44 +08:00
func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error, dataId string) {
2024-04-26 10:46:37 +08:00
logger.Info("Receive the response from VideoModerationResponse: %v", resp.String())
2024-04-27 00:18:31 +08:00
isSuccess := true
2024-04-27 11:14:44 +08:00
code := int32(http.StatusOK)
msg := ""
// 针对错误和发送失败的情况写入信息移交handler处理
2024-04-26 10:46:37 +08:00
if err != nil {
if _t, ok := err.(*tea.SDKError); ok {
logger.Error("VideoModerationWithOptions fail, errinfo: %v", util.DerefString(_t.Data))
2024-04-27 00:18:31 +08:00
isSuccess = false
2024-04-27 11:14:44 +08:00
code = http.StatusInternalServerError
msg = err.Error()
2024-04-26 10:46:37 +08:00
}
} else if util.DerefInt32(resp.StatusCode) != http.StatusOK {
logger.Error("response not success. status: %d", util.DerefInt32(resp.StatusCode))
2024-04-27 00:18:31 +08:00
isSuccess = false
2024-04-27 11:14:44 +08:00
code = util.DerefInt32(resp.StatusCode)
msg = util.DerefString(resp.Body.Message)
2024-04-26 10:46:37 +08:00
}
2024-04-27 00:18:31 +08:00
if !isSuccess {
2024-04-27 11:14:44 +08:00
rErr := handleVideoModerationResultResponseBody(&green20220302.VideoModerationResultResponseBody{
Code: goproto.Int32(code),
Message: goproto.String(msg),
Data: &green20220302.VideoModerationResultResponseBodyData{
DataId: goproto.String(dataId),
},
})
if rErr != nil {
logger.Error("handleVideoModerationResultResponseBody fail, err: %v", rErr)
}
2024-04-27 00:18:31 +08:00
}
2024-04-26 10:46:37 +08:00
}