by Robin at 20241224
This commit is contained in:
parent
7d6c85ad92
commit
057badc5c1
|
@ -117,6 +117,7 @@ func main() {
|
|||
router := httpengine.NewRouter()
|
||||
middleware.InitJwtAuthenticator(service.DefaultService.OpVerifyToken)
|
||||
service.DefaultService.ConnectToNotifCenter(middleware.InitNotifSender)
|
||||
service.DefaultCronService.ConnectToNotifSender(middleware.DefaultNotifSender)
|
||||
validator.InitDefaultNotNullValidator()
|
||||
controller.InitOffline(router)
|
||||
srv := &http.Server{
|
||||
|
|
|
@ -16,5 +16,5 @@ func InitOffline(r *gin.Engine) {
|
|||
|
||||
// 视频审核callback
|
||||
extVideoModerationGroup := r.Group("/offline_ext/video_moderation")
|
||||
extVideoModerationGroup.POST("callback", middleware.FORMParamValidator(video_moderation_proto.ExtVideoModerationReq{}), VideoModerationCallback)
|
||||
extVideoModerationGroup.POST("callback", middleware.FORMParamValidator(video_moderation_proto.ExtVideoModerationReq{}), VideoModerationCallback, middleware.NotifSender())
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ func VideoModerationCallback(ctx *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
err = videomoderation.HandleVideoModerationContent(req.Content)
|
||||
err = videomoderation.HandleVideoModerationContent(ctx, req.Content)
|
||||
if err != nil {
|
||||
logger.Error("HandleVideoModerationContent fail, req: %v, err: %v", util.ToJson(req), err)
|
||||
return
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"service/library/logger"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
xxl "github.com/xxl-job/xxl-job-executor-go"
|
||||
)
|
||||
|
||||
|
@ -26,6 +27,7 @@ type ServerConnInfo struct {
|
|||
type CronService struct {
|
||||
fileAbsPath string
|
||||
serverConnInfos []*ServerConnInfo
|
||||
notifSender gin.HandlerFunc
|
||||
}
|
||||
|
||||
func NewCronService() *CronService {
|
||||
|
@ -94,6 +96,10 @@ func (s *CronService) Init(c any) (exec xxl.Executor, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (s *CronService) ConnectToNotifSender(notifSender gin.HandlerFunc) {
|
||||
s.notifSender = notifSender
|
||||
}
|
||||
|
||||
// 自定义日志处理器
|
||||
func customLogHandle(req *xxl.LogReq) *xxl.LogRes {
|
||||
return &xxl.LogRes{Code: xxl.SuccessCode, Msg: "", Content: xxl.LogResContent{
|
||||
|
|
|
@ -75,12 +75,22 @@ func (handler *ImageAuditTaskResultHandler) generateAccountAvatarUpdateFunc() {
|
|||
} else {
|
||||
avatar = task.OldMedia
|
||||
}
|
||||
return _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{
|
||||
|
||||
err := _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{
|
||||
Account: &dbstruct.Account{
|
||||
Mid: mid,
|
||||
Avatar: avatar,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 机审回退增加自动消息发送
|
||||
if option == consts.ImageAuditTaskUpdate_Rollback {
|
||||
DefaultNotifBuilderHandler.Handle(ctx)(consts.AudNotifTemp_AvatarRollbacked)(task.GetAssociativeTableId())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,7 +62,9 @@ func (handler *NotifBuilderHandler) init() {
|
|||
handler.handleSysMembershipPurchased()
|
||||
// 注册审核通知处理
|
||||
handler.handleAudAvatarChangeApplied()
|
||||
handler.handleAudAvatarRollbacked()
|
||||
handler.handleAudNameChangeApplied()
|
||||
handler.handleAudNameRollbacked()
|
||||
handler.handleAudStreamerBasicInfoApplied()
|
||||
handler.handleAudStreamerBasicInfoPassed()
|
||||
handler.handleAudStreamerBasicInfoRejected()
|
||||
|
@ -288,6 +290,13 @@ func (handler *NotifBuilderHandler) handleAudAvatarChangeApplied() {
|
|||
}
|
||||
}
|
||||
|
||||
func (handler *NotifBuilderHandler) handleAudAvatarRollbacked() {
|
||||
handler.handlerMap[consts.AudNotifTemp_AvatarRollbacked] = func(ctx *gin.Context, args ...any) {
|
||||
mid := args[0].(int64)
|
||||
DefaultService.utilWriteNotifInfo(ctx, consts.AudNotifTemp_AvatarRollbacked, mid)
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *NotifBuilderHandler) handleAudNameChangeApplied() {
|
||||
handler.handlerMap[consts.AudNotifTemp_NameChangeApplied] = func(ctx *gin.Context, args ...any) {
|
||||
account := args[0].(*dbstruct.Account)
|
||||
|
@ -297,6 +306,13 @@ func (handler *NotifBuilderHandler) handleAudNameChangeApplied() {
|
|||
}
|
||||
}
|
||||
|
||||
func (handler *NotifBuilderHandler) handleAudNameRollbacked() {
|
||||
handler.handlerMap[consts.AudNotifTemp_NameRollbacked] = func(ctx *gin.Context, args ...any) {
|
||||
mid := args[0].(int64)
|
||||
DefaultService.utilWriteNotifInfo(ctx, consts.AudNotifTemp_NameRollbacked, mid)
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *NotifBuilderHandler) handleAudStreamerBasicInfoApplied() {
|
||||
handler.handlerMap[consts.AudNotifTemp_StreamerBasicInfoApplied] = func(ctx *gin.Context, args ...any) {
|
||||
streamerBasic := args[0].(*dbstruct.StreamerAuthApprovalBasic)
|
||||
|
|
|
@ -105,6 +105,11 @@ func (handler *TextAuditTaskResultHandler) generateAccountNameUpdateFunc() {
|
|||
})
|
||||
}
|
||||
|
||||
// 机审回退增加自动消息发送;仅针对用户
|
||||
if option == consts.TextAuditTaskUpdate_Rollback && acct.GetRole() == consts.User {
|
||||
DefaultNotifBuilderHandler.Handle(ctx)(consts.AudNotifTemp_NameRollbacked)(task.GetAssociativeTableId())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -198,19 +198,24 @@ func (s *CronService) ImageAuditBatch(ctx context.Context, param *xxl.RunReq) (m
|
|||
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
|
||||
handleMsg := &strings.Builder{}
|
||||
|
||||
ctxt := &gin.Context{}
|
||||
|
||||
// 刷新批次号
|
||||
batchId, err := imageaudit.RefreshBatchId()
|
||||
batchId, err := imageaudit.RefreshBatchId(ctxt)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("Refresh batchId failed, err: %v", err)
|
||||
}
|
||||
// 执行图像审核
|
||||
successNum, failNum, err := imageaudit.Run(batchId)
|
||||
successNum, failNum, err := imageaudit.Run(ctxt, batchId)
|
||||
if err != nil {
|
||||
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err))
|
||||
} else {
|
||||
handleMsg.WriteString(fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
|
||||
}
|
||||
|
||||
// 发送通知
|
||||
s.notifSender(ctxt)
|
||||
|
||||
logger.Info("Image audit batch ends...")
|
||||
return handleMsg.String()
|
||||
}
|
||||
|
@ -218,16 +223,22 @@ func (s *CronService) ImageAuditBatch(ctx context.Context, param *xxl.RunReq) (m
|
|||
func (s *CronService) ImageAuditBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) {
|
||||
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
|
||||
handleMsg := &strings.Builder{}
|
||||
|
||||
ctxt := &gin.Context{}
|
||||
|
||||
// 刷新批次号
|
||||
batchId := param.ExecutorParams
|
||||
// 执行图像审核
|
||||
successNum, failNum, err := imageaudit.Run(batchId)
|
||||
successNum, failNum, err := imageaudit.Run(ctxt, batchId)
|
||||
if err != nil {
|
||||
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err))
|
||||
} else {
|
||||
handleMsg.WriteString(fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
|
||||
}
|
||||
|
||||
// 发送通知
|
||||
s.notifSender(ctxt)
|
||||
|
||||
logger.Info("Image audit batch ends...")
|
||||
return handleMsg.String()
|
||||
}
|
||||
|
@ -236,20 +247,24 @@ func (s *CronService) TextAuditBatch(ctx context.Context, param *xxl.RunReq) (ms
|
|||
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
|
||||
handleMsg := &strings.Builder{}
|
||||
|
||||
ctxt := &gin.Context{}
|
||||
|
||||
// 刷新批次号
|
||||
batchId, err := textaudit.RefreshBatchId()
|
||||
batchId, err := textaudit.RefreshBatchId(ctxt)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("Refresh batchId failed, err: %v", err)
|
||||
}
|
||||
// 执
|
||||
// 执行文字审核
|
||||
successNum, failNum, err := textaudit.Run(batchId)
|
||||
successNum, failNum, err := textaudit.Run(ctxt, batchId)
|
||||
if err != nil {
|
||||
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err))
|
||||
} else {
|
||||
handleMsg.WriteString(fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
|
||||
}
|
||||
|
||||
// 发送通知
|
||||
s.notifSender(ctxt)
|
||||
|
||||
logger.Info("Text audit batch ends...")
|
||||
return handleMsg.String()
|
||||
}
|
||||
|
@ -258,16 +273,21 @@ func (s *CronService) TextAuditBatchHis(ctx context.Context, param *xxl.RunReq)
|
|||
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
|
||||
handleMsg := &strings.Builder{}
|
||||
|
||||
ctxt := &gin.Context{}
|
||||
|
||||
// 刷新批次号
|
||||
batchId := param.ExecutorParams
|
||||
// 执行文字审核
|
||||
successNum, failNum, err := textaudit.Run(batchId)
|
||||
successNum, failNum, err := textaudit.Run(ctxt, batchId)
|
||||
if err != nil {
|
||||
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err))
|
||||
} else {
|
||||
handleMsg.WriteString(fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
|
||||
}
|
||||
|
||||
// 发送通知
|
||||
s.notifSender(ctxt)
|
||||
|
||||
logger.Info("Text audit batch ends...")
|
||||
return handleMsg.String()
|
||||
}
|
||||
|
@ -524,19 +544,23 @@ func (s *CronService) VideoModerationBatch(ctx context.Context, param *xxl.RunRe
|
|||
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
|
||||
handleMsg := &strings.Builder{}
|
||||
|
||||
ctxt := &gin.Context{}
|
||||
|
||||
// 刷新批次号
|
||||
batchId, err := videomoderation.RefreshBatchId()
|
||||
batchId, err := videomoderation.RefreshBatchId(ctxt)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("Refresh batchId failed, err: %v", err)
|
||||
}
|
||||
// 执行视频审核
|
||||
successNum, failNum, err := videomoderation.Run(batchId)
|
||||
successNum, failNum, err := videomoderation.Run(ctxt, batchId)
|
||||
if err != nil {
|
||||
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, video moderation tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err))
|
||||
} else {
|
||||
handleMsg.WriteString(fmt.Sprintf("batchId : %v, video moderation tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
|
||||
}
|
||||
|
||||
// 视频审核作业有回调函数,在回调函数后触发消息发送
|
||||
|
||||
logger.Info("Video moderation batch ends...")
|
||||
return handleMsg.String()
|
||||
}
|
||||
|
@ -544,16 +568,21 @@ func (s *CronService) VideoModerationBatch(ctx context.Context, param *xxl.RunRe
|
|||
func (s *CronService) VideoModerationBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) {
|
||||
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
|
||||
handleMsg := &strings.Builder{}
|
||||
|
||||
ctxt := &gin.Context{}
|
||||
|
||||
// 刷新批次号
|
||||
batchId := param.ExecutorParams
|
||||
// 执行视频审核
|
||||
successNum, failNum, err := videomoderation.Run(batchId)
|
||||
successNum, failNum, err := videomoderation.Run(ctxt, batchId)
|
||||
if err != nil {
|
||||
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, video moderation tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err))
|
||||
} else {
|
||||
handleMsg.WriteString(fmt.Sprintf("batchId : %v, video moderation tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
|
||||
}
|
||||
|
||||
// 视频审核作业有回调函数,在回调函数后触发消息发送
|
||||
|
||||
logger.Info("Video moderation batch ends...")
|
||||
return handleMsg.String()
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ func Init(cfg *configcenter.ImageAuditConfig) (err error) {
|
|||
}
|
||||
|
||||
// batch_id
|
||||
_, err = RefreshBatchId()
|
||||
_, err = RefreshBatchId(&gin.Context{})
|
||||
if err != nil {
|
||||
logger.Error("RefreshBatchId fail, cfg: %v, err: %v", util.ToJson(config), err)
|
||||
return
|
||||
|
|
|
@ -21,7 +21,7 @@ type ImageAuditTaskBatchControlBlock struct {
|
|||
ActionMap map[string]*ImageAuditAction // 动作Id号-action的map
|
||||
}
|
||||
|
||||
func NewImageAuditTaskBatchControlBlock(tasks []*dbstruct.ImageAuditTask, batchId string) *ImageAuditTaskBatchControlBlock {
|
||||
func NewImageAuditTaskBatchControlBlock(ctx *gin.Context, tasks []*dbstruct.ImageAuditTask, batchId string) *ImageAuditTaskBatchControlBlock {
|
||||
if len(tasks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -40,17 +40,16 @@ func NewImageAuditTaskBatchControlBlock(tasks []*dbstruct.ImageAuditTask, batchI
|
|||
for _, task := range tasks {
|
||||
taskCtrlBlock := NewImageAuditTaskControlBlock(task)
|
||||
ctrlBlock.TaskCtrlBlocks = append(ctrlBlock.TaskCtrlBlocks, taskCtrlBlock)
|
||||
ctrlBlock.RecordAction(taskCtrlBlock)
|
||||
ctrlBlock.RecordAction(ctx, taskCtrlBlock)
|
||||
imageIndex, taskIndex = ctrlBlock.RecordImage(taskCtrlBlock, &mediaFillables, imageIndex, taskIndex)
|
||||
}
|
||||
|
||||
mediafiller.FillListInternal(&gin.Context{}, mediaFillables)
|
||||
mediafiller.FillListInternal(ctx, mediaFillables)
|
||||
|
||||
return ctrlBlock
|
||||
}
|
||||
|
||||
func (ctrlBlock *ImageAuditTaskBatchControlBlock) RecordAction(taskCtrlBlock *ImageAuditTaskControlBlock) {
|
||||
ctx := &gin.Context{}
|
||||
func (ctrlBlock *ImageAuditTaskBatchControlBlock) RecordAction(ctx *gin.Context, taskCtrlBlock *ImageAuditTaskControlBlock) {
|
||||
// 写map记录
|
||||
if ctrlBlock.ActionMap[taskCtrlBlock.ActionId] == nil { // 写入ctrlBlock.ActionMap
|
||||
ctrlBlock.ActionMap[taskCtrlBlock.ActionId] = NewImageAuditAction()
|
||||
|
@ -107,8 +106,8 @@ type ImageAuditTaskControlBlock struct {
|
|||
// 新建图像审核任务块
|
||||
func NewImageAuditTaskControlBlock(task *dbstruct.ImageAuditTask) (tcb *ImageAuditTaskControlBlock) {
|
||||
tcb = &ImageAuditTaskControlBlock{
|
||||
ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName),
|
||||
util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)),
|
||||
ActionId: fmt.Sprintf("%v%v%v%v", task.GetAssociativeDatabase(), task.GetAssociativeTableName(),
|
||||
task.GetAssociativeTableId(), task.GetAssociativeTableColumn()),
|
||||
ImageAuditTask: task,
|
||||
IsTaskPassed: true,
|
||||
AuditedFragmentsNum: 0,
|
||||
|
|
|
@ -19,8 +19,8 @@ import (
|
|||
)
|
||||
|
||||
// 刷新批次号
|
||||
func RefreshBatchId() (string, error) {
|
||||
batchId, err := _DefaultContentAuditRTI.GetAndUpdateImageAuditBatchId(&gin.Context{}, genereteBatchId())
|
||||
func RefreshBatchId(ctx *gin.Context) (string, error) {
|
||||
batchId, err := _DefaultContentAuditRTI.GetAndUpdateImageAuditBatchId(ctx, genereteBatchId())
|
||||
if err != nil {
|
||||
logger.Info("_DefaultContentAuditRTI GetAndUpdateImageAuditBatchId fail: %v", err)
|
||||
return "", err
|
||||
|
@ -29,10 +29,10 @@ func RefreshBatchId() (string, error) {
|
|||
}
|
||||
|
||||
// 图像审核主逻辑
|
||||
func Run(batchId string) (successNum int, failNum int, err error) {
|
||||
func Run(ctx *gin.Context, batchId string) (successNum int, failNum int, err error) {
|
||||
|
||||
// 查询该批次所有审核任务
|
||||
imageaudittasks, err := _DefaultImageAuditTask.OpList(&gin.Context{}, &imageaudittaskproto.OpListReq{
|
||||
imageaudittasks, err := _DefaultImageAuditTask.OpList(ctx, &imageaudittaskproto.OpListReq{
|
||||
BatchId: goproto.String(batchId),
|
||||
Status: goproto.Int64(consts.ImageAudit_Created),
|
||||
Sort: "ct",
|
||||
|
@ -47,14 +47,14 @@ func Run(batchId string) (successNum int, failNum int, err error) {
|
|||
|
||||
logger.Info("Image audit batch started, batchId : %v, task number : %v", batchId, len(imageaudittasks))
|
||||
// 1.构建批量任务控制块,初始化必要的actionMap、填充送审image等信息
|
||||
ctrlBlock := NewImageAuditTaskBatchControlBlock(imageaudittasks, batchId)
|
||||
ctrlBlock := NewImageAuditTaskBatchControlBlock(ctx, imageaudittasks, batchId)
|
||||
|
||||
// 2.创建请求
|
||||
// oss不在上海的服务器,需要调用Advance接口
|
||||
reqs, err := createScanImageAdvanceRequest(ctrlBlock)
|
||||
if err != nil {
|
||||
logger.Info("Create Scan ImageRequest fail: %v", err)
|
||||
handleBatchError(ctrlBlock, err)
|
||||
handleBatchError(ctx, ctrlBlock, err)
|
||||
failNum = len(imageaudittasks)
|
||||
return
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ func Run(batchId string) (successNum int, failNum int, err error) {
|
|||
if _t, ok := err.(*tea.SDKError); ok {
|
||||
logger.Error("ScanImageAdvance fail, errinfo : %v", util.DerefString(_t.Data))
|
||||
}
|
||||
handleBatchError(ctrlBlock, err)
|
||||
handleBatchError(ctx, ctrlBlock, err)
|
||||
failNum = len(imageaudittasks)
|
||||
return
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ func Run(batchId string) (successNum int, failNum int, err error) {
|
|||
}
|
||||
|
||||
// 4.处理应答
|
||||
err = handleScanImageResponseBodyDataResults(ctrlBlock, results)
|
||||
err = handleScanImageResponseBodyDataResults(ctx, ctrlBlock, results)
|
||||
|
||||
successNum = len(imageaudittasks)
|
||||
return
|
||||
|
@ -131,7 +131,7 @@ func createScanImageAdvanceRequest(ctrlBlock *ImageAuditTaskBatchControlBlock) (
|
|||
return
|
||||
}
|
||||
|
||||
func handleScanImageResponseBodyDataResults(ctrlBlock *ImageAuditTaskBatchControlBlock, results []*imageaudit.ScanImageResponseBodyDataResults) (err error) {
|
||||
func handleScanImageResponseBodyDataResults(ctx *gin.Context, ctrlBlock *ImageAuditTaskBatchControlBlock, results []*imageaudit.ScanImageResponseBodyDataResults) (err error) {
|
||||
taskCtrlBlocks := ctrlBlock.TaskCtrlBlocks
|
||||
img2taskIndexMap := ctrlBlock.Img2taskIndexMap
|
||||
actionMap := ctrlBlock.ActionMap
|
||||
|
@ -147,7 +147,7 @@ func handleScanImageResponseBodyDataResults(ctrlBlock *ImageAuditTaskBatchContro
|
|||
|
||||
// 2.立即在imageaudit-imageaudit中更新该次审核结果
|
||||
logger.Info("Handling its audit record...")
|
||||
pass, imageAudit, err := handleImageAudit(dataId, result)
|
||||
pass, imageAudit, err := handleImageAudit(ctx, dataId, result)
|
||||
if err != nil {
|
||||
logger.Error("handleImageAudit fail: %v", err)
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ func handleScanImageResponseBodyDataResults(ctrlBlock *ImageAuditTaskBatchContro
|
|||
// 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
|
||||
logger.Info("Recording the action result...")
|
||||
if isActionCompleted {
|
||||
if err = finalizeTask(action); err != nil {
|
||||
if err = finalizeTask(ctx, action); err != nil {
|
||||
logger.Error("finalizeTask fail: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -179,8 +179,7 @@ func handleScanImageResponseBodyDataResults(ctrlBlock *ImageAuditTaskBatchContro
|
|||
return
|
||||
}
|
||||
|
||||
func handleImageAudit(dataId string, result *imageaudit.ScanImageResponseBodyDataResults) (pass bool, imageaudit *dbstruct.ImageAudit, err error) {
|
||||
ctx := &gin.Context{}
|
||||
func handleImageAudit(ctx *gin.Context, dataId string, result *imageaudit.ScanImageResponseBodyDataResults) (pass bool, imageaudit *dbstruct.ImageAudit, err error) {
|
||||
imageaudit = &dbstruct.ImageAudit{
|
||||
Id: goproto.String(dataId),
|
||||
}
|
||||
|
@ -219,7 +218,7 @@ func handleTaskAction(task *ImageAuditTaskControlBlock, actionMap map[string]*Im
|
|||
}
|
||||
|
||||
// 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
|
||||
func finalizeTask(action *ImageAuditAction) (err error) {
|
||||
func finalizeTask(ctx *gin.Context, action *ImageAuditAction) (err error) {
|
||||
|
||||
lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务和它的索引
|
||||
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
|
||||
|
@ -249,22 +248,22 @@ func finalizeTask(action *ImageAuditAction) (err error) {
|
|||
|
||||
// 判定任务链终态,终态非成功,执行回退操作,记录到数据库中
|
||||
if !action.IsPassed() {
|
||||
if err = executeRollBack(lastValidTask.ImageAuditTask); err != nil {
|
||||
if err = executeRollBack(ctx, lastValidTask.ImageAuditTask); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 更新所有任务状态
|
||||
if err = updatePassedTasks(passTaskIds); err != nil {
|
||||
if err = updatePassedTasks(ctx, passTaskIds); err != nil {
|
||||
return
|
||||
}
|
||||
if err = updateExpiredTasks(expiredTaskIds); err != nil {
|
||||
if err = updateExpiredTasks(ctx, expiredTaskIds); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 终态成功,执行成功后操作
|
||||
if action.IsPassed() {
|
||||
if err = handleSuccess(action.TaskChain[len(action.TaskChain)-1].ImageAuditTask); err != nil {
|
||||
if err = handleSuccess(ctx, action.TaskChain[len(action.TaskChain)-1].ImageAuditTask); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -339,8 +338,7 @@ func copyScanResultInfo(imageaudit *dbstruct.ImageAudit, result *imageaudit.Scan
|
|||
return
|
||||
}
|
||||
|
||||
func executeRollBack(lastValidTask *dbstruct.ImageAuditTask) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func executeRollBack(ctx *gin.Context, lastValidTask *dbstruct.ImageAuditTask) (err error) {
|
||||
lastValidTask.Status = goproto.Int64(consts.ImageAudit_Rollbacked)
|
||||
if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.ImageAuditTaskUpdate_Rollback); err != nil {
|
||||
logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.Id), err)
|
||||
|
@ -367,8 +365,7 @@ func executeRollBack(lastValidTask *dbstruct.ImageAuditTask) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func handleSuccess(task *dbstruct.ImageAuditTask) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func handleSuccess(ctx *gin.Context, task *dbstruct.ImageAuditTask) (err error) {
|
||||
task.Status = goproto.Int64(consts.ImageAudit_Passed)
|
||||
if err = _DefaultResultHandler.Handle(ctx, task, consts.ImageAuditTaskUpdate_Success); err != nil {
|
||||
logger.Error("Handle success taskId:%v fail:%v", util.DerefString(task.Id), err)
|
||||
|
@ -386,8 +383,7 @@ func handleSuccess(task *dbstruct.ImageAuditTask) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func updatePassedTasks(passTaskIds []string) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func updatePassedTasks(ctx *gin.Context, passTaskIds []string) (err error) {
|
||||
if err = _DefaultImageAuditTask.OpUpdateByIds(ctx, &imageaudittaskproto.OpUpdateByIdsReq{
|
||||
ImageAuditTask: &dbstruct.ImageAuditTask{
|
||||
Status: goproto.Int64(consts.ImageAudit_Passed),
|
||||
|
@ -401,8 +397,7 @@ func updatePassedTasks(passTaskIds []string) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func updateExpiredTasks(expiredTaskIds []string) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func updateExpiredTasks(ctx *gin.Context, expiredTaskIds []string) (err error) {
|
||||
if err = _DefaultImageAuditTask.OpUpdateByIds(ctx, &imageaudittaskproto.OpUpdateByIdsReq{
|
||||
ImageAuditTask: &dbstruct.ImageAuditTask{
|
||||
Status: goproto.Int64(consts.ImageAudit_Expired),
|
||||
|
@ -416,9 +411,8 @@ func updateExpiredTasks(expiredTaskIds []string) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func handleBatchError(ctrlBlock *ImageAuditTaskBatchControlBlock, _err error) (err error) {
|
||||
func handleBatchError(ctx *gin.Context, ctrlBlock *ImageAuditTaskBatchControlBlock, _err error) (err error) {
|
||||
logger.Info("All tasks of this batchId: %v has failed, rolling back...", ctrlBlock.BatchId)
|
||||
ctx := &gin.Context{}
|
||||
if err = _DefaultImageAudit.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.ImageAudit{
|
||||
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
|
||||
Remarks: goproto.String("批次任务失败,原因详见task表"),
|
||||
|
|
|
@ -62,7 +62,7 @@ func Init(cfg *configcenter.TextAuditConfig) (err error) {
|
|||
// 初始化审核选项
|
||||
labels = strings.Split(cfg.Labels, " ")
|
||||
|
||||
_, err = RefreshBatchId()
|
||||
_, err = RefreshBatchId(&gin.Context{})
|
||||
if err != nil {
|
||||
logger.Error("RefreshBatchId fail, err: %v", err)
|
||||
return
|
||||
|
|
|
@ -17,7 +17,7 @@ type TextAuditTaskBatchControlBlock struct {
|
|||
ActionMap map[string]*TextAuditAction // 动作Id号-action的map
|
||||
}
|
||||
|
||||
func NewTextAuditTaskBatchControlBlock(tasks []*dbstruct.TextAuditTask, batchId string) *TextAuditTaskBatchControlBlock {
|
||||
func NewTextAuditTaskBatchControlBlock(ctx *gin.Context, tasks []*dbstruct.TextAuditTask, batchId string) *TextAuditTaskBatchControlBlock {
|
||||
if len(tasks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -30,14 +30,13 @@ func NewTextAuditTaskBatchControlBlock(tasks []*dbstruct.TextAuditTask, batchId
|
|||
for _, task := range tasks {
|
||||
taskCtrlBlock := NewTextAuditTaskControlBlock(task)
|
||||
ctrlBlock.TaskCtrlBlocks = append(ctrlBlock.TaskCtrlBlocks, taskCtrlBlock)
|
||||
ctrlBlock.RecordAction(taskCtrlBlock)
|
||||
ctrlBlock.RecordAction(ctx, taskCtrlBlock)
|
||||
}
|
||||
|
||||
return ctrlBlock
|
||||
}
|
||||
|
||||
func (ctrlBlock *TextAuditTaskBatchControlBlock) RecordAction(taskCtrlBlock *TextAuditTaskControlBlock) {
|
||||
ctx := &gin.Context{}
|
||||
func (ctrlBlock *TextAuditTaskBatchControlBlock) RecordAction(ctx *gin.Context, taskCtrlBlock *TextAuditTaskControlBlock) {
|
||||
// 写map记录
|
||||
if ctrlBlock.ActionMap[taskCtrlBlock.ActionId] == nil { // 写入ctrlBlock.ActionMap
|
||||
ctrlBlock.ActionMap[taskCtrlBlock.ActionId] = NewTextAuditAction()
|
||||
|
@ -66,8 +65,8 @@ type TextAuditTaskControlBlock struct {
|
|||
// 新建文字审核任务块
|
||||
func NewTextAuditTaskControlBlock(task *dbstruct.TextAuditTask) (tcb *TextAuditTaskControlBlock) {
|
||||
tcb = &TextAuditTaskControlBlock{
|
||||
ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName),
|
||||
util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)),
|
||||
ActionId: fmt.Sprintf("%v%v%v%v", task.GetAssociativeDatabase(), task.GetAssociativeTableName(),
|
||||
task.GetAssociativeTableId(), task.GetAssociativeTableColumn()),
|
||||
TextAuditTask: task,
|
||||
IsTaskPassed: true,
|
||||
IsGivingNoticeToBatch: false,
|
||||
|
|
|
@ -17,8 +17,8 @@ import (
|
|||
)
|
||||
|
||||
// 刷新批次号
|
||||
func RefreshBatchId() (string, error) {
|
||||
batchId, err := _DefaultContentAuditRTI.GetAndUpdateTextAuditBatchId(&gin.Context{}, genereteBatchId())
|
||||
func RefreshBatchId(ctx *gin.Context) (string, error) {
|
||||
batchId, err := _DefaultContentAuditRTI.GetAndUpdateTextAuditBatchId(ctx, genereteBatchId())
|
||||
if err != nil {
|
||||
logger.Info("_DefaultContentAuditRTI GetAndUpdateTextAuditBatchId fail: %v", err)
|
||||
return "", err
|
||||
|
@ -26,10 +26,10 @@ func RefreshBatchId() (string, error) {
|
|||
return batchId, nil
|
||||
}
|
||||
|
||||
func Run(batchId string) (successNum int, failNum int, err error) {
|
||||
func Run(ctx *gin.Context, batchId string) (successNum int, failNum int, err error) {
|
||||
|
||||
// 查询该批次所有审核任务
|
||||
textaudittasks, err := _DefaultTextAuditTask.OpList(&gin.Context{}, &textaudittaskproto.OpListReq{
|
||||
textaudittasks, err := _DefaultTextAuditTask.OpList(ctx, &textaudittaskproto.OpListReq{
|
||||
BatchId: goproto.String(batchId),
|
||||
Status: goproto.Int64(consts.TextAudit_Created),
|
||||
Sort: "ct",
|
||||
|
@ -44,7 +44,7 @@ func Run(batchId string) (successNum int, failNum int, err error) {
|
|||
|
||||
logger.Info("Text audit batch started, batchId : %v, task number : %v", batchId, len(textaudittasks))
|
||||
// 1.构建批量任务控制块,初始化必要的actionMap、填充送审text等信息
|
||||
ctrlBlock := NewTextAuditTaskBatchControlBlock(textaudittasks, batchId)
|
||||
ctrlBlock := NewTextAuditTaskBatchControlBlock(ctx, textaudittasks, batchId)
|
||||
|
||||
// 2.创建请求
|
||||
reqs, err := createScanTextRequest(ctrlBlock)
|
||||
|
@ -72,7 +72,7 @@ func Run(batchId string) (successNum int, failNum int, err error) {
|
|||
}
|
||||
|
||||
// 4.处理应答
|
||||
err = handleScanTextResponseBodyDataElements(ctrlBlock, results)
|
||||
err = handleScanTextResponseBodyDataElements(ctx, ctrlBlock, results)
|
||||
|
||||
successNum = len(textaudittasks)
|
||||
return
|
||||
|
@ -115,7 +115,7 @@ func createScanTextRequest(ctrlBlock *TextAuditTaskBatchControlBlock) (requests
|
|||
return
|
||||
}
|
||||
|
||||
func handleScanTextResponseBodyDataElements(ctrlBlock *TextAuditTaskBatchControlBlock, results []*textaudit.ScanTextResponseBodyDataElements) (err error) {
|
||||
func handleScanTextResponseBodyDataElements(ctx *gin.Context, ctrlBlock *TextAuditTaskBatchControlBlock, results []*textaudit.ScanTextResponseBodyDataElements) (err error) {
|
||||
taskCtrlBlocks := ctrlBlock.TaskCtrlBlocks
|
||||
actionMap := ctrlBlock.ActionMap
|
||||
for i, result := range results {
|
||||
|
@ -123,7 +123,7 @@ func handleScanTextResponseBodyDataElements(ctrlBlock *TextAuditTaskBatchControl
|
|||
action := &TextAuditAction{}
|
||||
|
||||
// 1.立即在textaudit-textaudit中更新该次审核结果
|
||||
pass, textaudit, err := handleTextAudit(taskCtrlBlocks[i].TextAuditTask.TextAuditId, result)
|
||||
pass, textaudit, err := handleTextAudit(ctx, taskCtrlBlocks[i].TextAuditTask.TextAuditId, result)
|
||||
if err != nil {
|
||||
logger.Error("handleTextAudit fail: %v", err)
|
||||
}
|
||||
|
@ -137,7 +137,7 @@ func handleScanTextResponseBodyDataElements(ctrlBlock *TextAuditTaskBatchControl
|
|||
|
||||
// 4.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
|
||||
if isActionCompleted {
|
||||
if err = finalizeTask(action); err != nil {
|
||||
if err = finalizeTask(ctx, action); err != nil {
|
||||
logger.Error("finalizeTask fail: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -145,8 +145,7 @@ func handleScanTextResponseBodyDataElements(ctrlBlock *TextAuditTaskBatchControl
|
|||
return
|
||||
}
|
||||
|
||||
func handleTextAudit(id *string, result *textaudit.ScanTextResponseBodyDataElements) (pass bool, textaudit *dbstruct.TextAudit, err error) {
|
||||
ctx := &gin.Context{}
|
||||
func handleTextAudit(ctx *gin.Context, id *string, result *textaudit.ScanTextResponseBodyDataElements) (pass bool, textaudit *dbstruct.TextAudit, err error) {
|
||||
textaudit = &dbstruct.TextAudit{
|
||||
Id: id,
|
||||
}
|
||||
|
@ -175,7 +174,7 @@ func handleTaskAction(task *TextAuditTaskControlBlock, actionMap map[string]*Tex
|
|||
}
|
||||
|
||||
// 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
|
||||
func finalizeTask(action *TextAuditAction) (err error) {
|
||||
func finalizeTask(ctx *gin.Context, action *TextAuditAction) (err error) {
|
||||
lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务及其索引
|
||||
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
|
||||
expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId,失效任务是指那些前驱任务依然失败的失败任务,这些任务已失去实际意义,不可以用来回退
|
||||
|
@ -203,22 +202,22 @@ func finalizeTask(action *TextAuditAction) (err error) {
|
|||
}
|
||||
// 判定任务链终态,终态非成功,执行回退操作,记录到数据库中
|
||||
if !action.IsPassed() {
|
||||
if err = executeRollBack(lastValidTask.TextAuditTask); err != nil {
|
||||
if err = executeRollBack(ctx, lastValidTask.TextAuditTask); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 更新所有任务状态
|
||||
if err = updatePassedTasks(passTaskIds); err != nil {
|
||||
if err = updatePassedTasks(ctx, passTaskIds); err != nil {
|
||||
return
|
||||
}
|
||||
if err = updateExpiredTasks(expiredTaskIds); err != nil {
|
||||
if err = updateExpiredTasks(ctx, expiredTaskIds); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 终态成功,执行成功后操作
|
||||
if action.IsPassed() {
|
||||
if err = handleSuccess(action.TaskChain[len(action.TaskChain)-1].TextAuditTask); err != nil {
|
||||
if err = handleSuccess(ctx, action.TaskChain[len(action.TaskChain)-1].TextAuditTask); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -290,8 +289,7 @@ func parseDetailsContexts(contexts []*textaudit.ScanTextResponseBodyDataElements
|
|||
return ctxs
|
||||
}
|
||||
|
||||
func executeRollBack(lastValidTask *dbstruct.TextAuditTask) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func executeRollBack(ctx *gin.Context, lastValidTask *dbstruct.TextAuditTask) (err error) {
|
||||
lastValidTask.Status = goproto.Int64(consts.TextAudit_Rollbacked)
|
||||
if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.TextAuditTaskUpdate_Rollback); err != nil {
|
||||
logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.Id), err)
|
||||
|
@ -318,8 +316,7 @@ func executeRollBack(lastValidTask *dbstruct.TextAuditTask) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func handleSuccess(task *dbstruct.TextAuditTask) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func handleSuccess(ctx *gin.Context, task *dbstruct.TextAuditTask) (err error) {
|
||||
task.Status = goproto.Int64(consts.TextAudit_Passed)
|
||||
if err = _DefaultResultHandler.Handle(ctx, task, consts.TextAuditTaskUpdate_Success); err != nil {
|
||||
logger.Error("Handle success taskId:%v fail:%v", util.DerefString(task.Id), err)
|
||||
|
@ -337,8 +334,7 @@ func handleSuccess(task *dbstruct.TextAuditTask) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func updatePassedTasks(passTaskIds []string) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func updatePassedTasks(ctx *gin.Context, passTaskIds []string) (err error) {
|
||||
if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{
|
||||
TextAuditTask: &dbstruct.TextAuditTask{
|
||||
Status: goproto.Int64(consts.TextAudit_Passed),
|
||||
|
@ -351,8 +347,7 @@ func updatePassedTasks(passTaskIds []string) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func updateExpiredTasks(expiredTaskIds []string) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func updateExpiredTasks(ctx *gin.Context, expiredTaskIds []string) (err error) {
|
||||
if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{
|
||||
TextAuditTask: &dbstruct.TextAuditTask{
|
||||
Status: goproto.Int64(consts.TextAudit_Expired),
|
||||
|
|
|
@ -72,7 +72,7 @@ func Init(cfg *configcenter.VideoModerationConfig) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
_, err = RefreshBatchId()
|
||||
_, err = RefreshBatchId(&gin.Context{})
|
||||
if err != nil {
|
||||
logger.Error("RefreshBatchId fail, cfg: %v, err: %v", util.ToJson(config), err)
|
||||
return
|
||||
|
|
|
@ -20,8 +20,7 @@ type VideoModerationTaskBatchControlBlock struct {
|
|||
FinishedTaskNum int64 // 已完成任务数量
|
||||
}
|
||||
|
||||
func (ctrlBlock *VideoModerationTaskBatchControlBlock) RecordAction(taskCtrlBlock *VideoModerationTaskControlBlock) {
|
||||
ctx := &gin.Context{}
|
||||
func (ctrlBlock *VideoModerationTaskBatchControlBlock) RecordAction(ctx *gin.Context, taskCtrlBlock *VideoModerationTaskControlBlock) {
|
||||
// 写map记录
|
||||
if ctrlBlock.ActionMap[taskCtrlBlock.ActionId] == nil { // 写入ctrlBlock.ActionMap
|
||||
ctrlBlock.ActionMap[taskCtrlBlock.ActionId] = NewVideoModerationAction()
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
goproto "google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func HandleVideoModerationContent(content string) (err error) {
|
||||
func HandleVideoModerationContent(ctx *gin.Context, content string) (err error) {
|
||||
// 获取ResponseBody,解析出batchId和视频审核表id
|
||||
logger.Info("Unmarshaling ResponseBody...")
|
||||
result := &green20220302.VideoModerationResultResponseBody{}
|
||||
|
@ -27,11 +27,11 @@ func HandleVideoModerationContent(content string) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
err = handleVideoModerationResultResponseBody(result)
|
||||
err = handleVideoModerationResultResponseBody(ctx, result)
|
||||
return
|
||||
}
|
||||
|
||||
func handleVideoModerationResultResponseBody(result *green20220302.VideoModerationResultResponseBody) (err error) {
|
||||
func handleVideoModerationResultResponseBody(ctx *gin.Context, result *green20220302.VideoModerationResultResponseBody) (err error) {
|
||||
|
||||
isTaskCompleted := false
|
||||
isActionCompleted := false
|
||||
|
@ -39,7 +39,7 @@ func handleVideoModerationResultResponseBody(result *green20220302.VideoModerati
|
|||
|
||||
// 1.立即在视频审核表更新该次审核结果
|
||||
logger.Info("Handling its moderation record...")
|
||||
isPassed, videomoderation, err := handleVideoModeration(result)
|
||||
isPassed, videomoderation, err := handleVideoModeration(ctx, result)
|
||||
if err != nil {
|
||||
logger.Error("handleVideoModeration fail: %v", err)
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ func handleVideoModerationResultResponseBody(result *green20220302.VideoModerati
|
|||
// 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
|
||||
logger.Info("Recording the action result...")
|
||||
if isActionCompleted {
|
||||
if err = finalizeTask(action); err != nil {
|
||||
if err = finalizeTask(ctx, action); err != nil {
|
||||
logger.Error("finalizeTask fail: %v", err)
|
||||
}
|
||||
logger.Info("video moderation task of id %v has finished...", tcb.VideoModerationTask.GetId())
|
||||
|
@ -91,7 +91,7 @@ func handleVideoModerationResultResponseBody(result *green20220302.VideoModerati
|
|||
return
|
||||
}
|
||||
|
||||
func handleVideoModeration(result *green20220302.VideoModerationResultResponseBody) (isPassed bool, videomoderation *dbstruct.VideoModeration, err error) {
|
||||
func handleVideoModeration(ctx *gin.Context, result *green20220302.VideoModerationResultResponseBody) (isPassed bool, videomoderation *dbstruct.VideoModeration, err error) {
|
||||
|
||||
dataId := util.DerefString(result.Data.DataId)
|
||||
batchId := dataId[:14]
|
||||
|
@ -140,7 +140,7 @@ func handleVideoModeration(result *green20220302.VideoModerationResultResponseBo
|
|||
}
|
||||
}
|
||||
|
||||
if err = _DefaultVideoModeration.OpUpdate(&gin.Context{}, &video_moderation_proto.OpUpdateReq{
|
||||
if err = _DefaultVideoModeration.OpUpdate(ctx, &video_moderation_proto.OpUpdateReq{
|
||||
VideoModeration: videomoderation,
|
||||
}); err != nil {
|
||||
logger.Error("_DefaultVideoModeration OpUpdate fail: %v\n", err)
|
||||
|
@ -286,7 +286,7 @@ func handleTaskAction(task *VideoModerationTaskControlBlock, actionMap map[strin
|
|||
}
|
||||
|
||||
// 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
|
||||
func finalizeTask(action *VideoModerationAction) (err error) {
|
||||
func finalizeTask(ctx *gin.Context, action *VideoModerationAction) (err error) {
|
||||
|
||||
lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务和它的索引
|
||||
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
|
||||
|
@ -316,22 +316,22 @@ func finalizeTask(action *VideoModerationAction) (err error) {
|
|||
|
||||
// 判定任务链终态,终态非成功,执行回退操作,记录到数据库中
|
||||
if !action.IsPassed() {
|
||||
if err = executeRollBack(lastValidTask.VideoModerationTask); err != nil {
|
||||
if err = executeRollBack(ctx, lastValidTask.VideoModerationTask); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 更新所有任务状态
|
||||
if err = updatePassedTasks(passTaskIds); err != nil {
|
||||
if err = updatePassedTasks(ctx, passTaskIds); err != nil {
|
||||
return
|
||||
}
|
||||
if err = updateExpiredTasks(expiredTaskIds); err != nil {
|
||||
if err = updateExpiredTasks(ctx, expiredTaskIds); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 终态成功,执行成功后操作
|
||||
if action.IsPassed() {
|
||||
if err = handleSuccess(action.TaskChain[len(action.TaskChain)-1].VideoModerationTask); err != nil {
|
||||
if err = handleSuccess(ctx, action.TaskChain[len(action.TaskChain)-1].VideoModerationTask); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -341,8 +341,7 @@ func finalizeTask(action *VideoModerationAction) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func executeRollBack(lastValidTask *dbstruct.VideoModerationTask) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func executeRollBack(ctx *gin.Context, lastValidTask *dbstruct.VideoModerationTask) (err error) {
|
||||
lastValidTask.Status = goproto.Int64(consts.VideoModeration_Rollbacked)
|
||||
if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.VideoModerationTaskUpdate_Rollback); err != nil {
|
||||
logger.Error("Roll back taskId:%v fail:%v", lastValidTask.GetId(), err)
|
||||
|
@ -371,8 +370,7 @@ func executeRollBack(lastValidTask *dbstruct.VideoModerationTask) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func handleSuccess(task *dbstruct.VideoModerationTask) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func handleSuccess(ctx *gin.Context, task *dbstruct.VideoModerationTask) (err error) {
|
||||
task.Status = goproto.Int64(consts.VideoModeration_Passed)
|
||||
if err = _DefaultResultHandler.Handle(ctx, task, consts.VideoModerationTaskUpdate_Success); err != nil {
|
||||
logger.Error("Handle success taskId:%v fail:%v", task.GetId(), err)
|
||||
|
@ -390,8 +388,7 @@ func handleSuccess(task *dbstruct.VideoModerationTask) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func updatePassedTasks(passTaskIds []string) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func updatePassedTasks(ctx *gin.Context, passTaskIds []string) (err error) {
|
||||
if err = _DefaultVideoModerationTask.OpUpdateByIds(ctx, &video_moderation_task_proto.OpUpdateByIdsReq{
|
||||
VideoModerationTask: &dbstruct.VideoModerationTask{
|
||||
Status: goproto.Int64(consts.VideoModeration_Passed),
|
||||
|
@ -405,8 +402,7 @@ func updatePassedTasks(passTaskIds []string) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func updateExpiredTasks(expiredTaskIds []string) (err error) {
|
||||
ctx := &gin.Context{}
|
||||
func updateExpiredTasks(ctx *gin.Context, expiredTaskIds []string) (err error) {
|
||||
if err = _DefaultVideoModerationTask.OpUpdateByIds(ctx, &video_moderation_task_proto.OpUpdateByIdsReq{
|
||||
VideoModerationTask: &dbstruct.VideoModerationTask{
|
||||
Status: goproto.Int64(consts.VideoModeration_Expired),
|
||||
|
|
|
@ -21,8 +21,8 @@ import (
|
|||
)
|
||||
|
||||
// 刷新批次号
|
||||
func RefreshBatchId() (string, error) {
|
||||
batchId, err := _DefaultContentAuditRTI.GetAndUpdateVideoModerationBatchId(&gin.Context{}, genereteBatchId())
|
||||
func RefreshBatchId(ctx *gin.Context) (string, error) {
|
||||
batchId, err := _DefaultContentAuditRTI.GetAndUpdateVideoModerationBatchId(ctx, genereteBatchId())
|
||||
if err != nil {
|
||||
logger.Info("_DefaultContentAuditRTI GetAndUpdateVideoModerationBatchId fail: %v", err)
|
||||
return "", err
|
||||
|
@ -31,10 +31,10 @@ func RefreshBatchId() (string, error) {
|
|||
}
|
||||
|
||||
// 视频审核主逻辑
|
||||
func Run(batchId string) (successNum int, failNum int, err error) {
|
||||
func Run(ctx *gin.Context, batchId string) (successNum int, failNum int, err error) {
|
||||
|
||||
// 1.查询该批次所有审核任务
|
||||
videoModerationTasks, err := _DefaultVideoModerationTask.OpList(&gin.Context{}, &video_moderation_task_proto.OpListReq{
|
||||
videoModerationTasks, err := _DefaultVideoModerationTask.OpList(ctx, &video_moderation_task_proto.OpListReq{
|
||||
BatchId: goproto.String(batchId),
|
||||
Status: goproto.Int64(consts.VideoModeration_Created),
|
||||
Sort: "ct",
|
||||
|
@ -50,7 +50,7 @@ func Run(batchId string) (successNum int, failNum int, err error) {
|
|||
logger.Info("Video moderation batch started, batchId : %v, task number : %v", batchId, len(videoModerationTasks))
|
||||
|
||||
// 2.创建批量任务控制块,获取待审核的视频及其dataId(视频审核表Id),将批量任务控制块暂时保存
|
||||
btcb, videos, dataIds := createVideoModerationTaskBatchControlBlock(videoModerationTasks)
|
||||
btcb, videos, dataIds := createVideoModerationTaskBatchControlBlock(ctx, videoModerationTasks)
|
||||
btcb.BatchId = batchId
|
||||
defaultVideoModerationTaskScheduler.btcbMp[btcb.BatchId] = btcb
|
||||
|
||||
|
@ -64,14 +64,14 @@ func Run(batchId string) (successNum int, failNum int, err error) {
|
|||
}
|
||||
for i, req := range reqs {
|
||||
_result, err := defaultVideoModerationClient.VideoModerationWithOptions(req, runtime)
|
||||
handleVideoModerationResponse(_result, err, dataIds[i])
|
||||
handleVideoModerationResponse(ctx, _result, err, dataIds[i])
|
||||
}
|
||||
|
||||
successNum = len(videoModerationTasks)
|
||||
return
|
||||
}
|
||||
|
||||
func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModerationTask) (batchTaskCtrlBlock *VideoModerationTaskBatchControlBlock, videos []*dbstruct.MediaComponent, dataIds []string) {
|
||||
func createVideoModerationTaskBatchControlBlock(ctx *gin.Context, tasks []*dbstruct.VideoModerationTask) (batchTaskCtrlBlock *VideoModerationTaskBatchControlBlock, videos []*dbstruct.MediaComponent, dataIds []string) {
|
||||
// 填充媒体,获取url和dataId,创建action信息
|
||||
videos = make([]*dbstruct.MediaComponent, 0)
|
||||
mediaFillables := make([]mediafiller.MediaFillable, 0)
|
||||
|
@ -96,13 +96,13 @@ func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModeratio
|
|||
|
||||
batchTaskCtrlBlock.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = tcb
|
||||
batchTaskCtrlBlock.VidmodId2taskMap[task.GetVideoModerationId()] = tcb
|
||||
batchTaskCtrlBlock.RecordAction(tcb)
|
||||
batchTaskCtrlBlock.RecordAction(ctx, tcb)
|
||||
} else { // 已分片
|
||||
videoIds := task.AuditedMedia.GetVideoIds()
|
||||
videoAuditIds := task.GetVideoModerationFragmentIds()
|
||||
|
||||
batchTaskCtrlBlock.TaskCtrlBlockMap[tcb.VideoModerationTask.GetId()] = tcb
|
||||
batchTaskCtrlBlock.RecordAction(tcb)
|
||||
batchTaskCtrlBlock.RecordAction(ctx, tcb)
|
||||
for i, videoId := range videoIds {
|
||||
video := &dbstruct.MediaComponent{
|
||||
VideoIds: util.Int64Slice([]int64{videoId}),
|
||||
|
@ -116,7 +116,7 @@ func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModeratio
|
|||
}
|
||||
}
|
||||
|
||||
mediafiller.FillListInternal(&gin.Context{}, mediaFillables)
|
||||
mediafiller.FillListInternal(ctx, mediaFillables)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -142,7 +142,7 @@ func createVideoModerationRequest(videos []*dbstruct.MediaComponent, dataIds []s
|
|||
return
|
||||
}
|
||||
|
||||
func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse, err error, dataId string) {
|
||||
func handleVideoModerationResponse(ctx *gin.Context, resp *green20220302.VideoModerationResponse, err error, dataId string) {
|
||||
|
||||
logger.Info("Receive the response from VideoModerationResponse: %v", resp.String())
|
||||
|
||||
|
@ -168,7 +168,7 @@ func handleVideoModerationResponse(resp *green20220302.VideoModerationResponse,
|
|||
}
|
||||
|
||||
if !isSuccess {
|
||||
rErr := handleVideoModerationResultResponseBody(&green20220302.VideoModerationResultResponseBody{
|
||||
rErr := handleVideoModerationResultResponseBody(ctx, &green20220302.VideoModerationResultResponseBody{
|
||||
Code: goproto.Int32(code),
|
||||
Message: goproto.String(msg),
|
||||
Data: &green20220302.VideoModerationResultResponseBodyData{
|
||||
|
|
Loading…
Reference in New Issue