From 24b52988f2b61510be02e4811315f0138e7ea577 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Tue, 23 Jul 2024 16:37:16 +0800 Subject: [PATCH] by Robin at 20240723 --- app/mix/dao/mongo.go | 12 ++++----- app/mix/service/imageauditservice.go | 27 ++++++++++++++++--- app/mix/service/logic/imageaudittask.go | 4 +-- app/mix/service/logic/textaudittask.go | 4 +-- .../service/logic/video_moderation_task.go | 4 +-- app/mix/service/textauditservice.go | 26 +++++++++++++++--- app/mix/service/video_moderation_service.go | 26 +++++++++++++++--- dbstruct/imageaudittask.go | 21 +++++++++++++++ dbstruct/textaudittask.go | 21 +++++++++++++++ library/contentaudit/imageaudit/client.go | 2 +- .../contentaudit/imageaudit/control_block.go | 4 ++- library/contentaudit/textaudit/client.go | 2 +- .../contentaudit/textaudit/control_block.go | 4 ++- .../contentaudit/video_moderation/client.go | 2 +- .../video_moderation/control_block.go | 4 ++- 15 files changed, 133 insertions(+), 30 deletions(-) diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index d79d0bb8..1de2a9ac 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -3091,7 +3091,7 @@ func (m *Mongo) UpdateImageAuditTaskByIds(ctx *gin.Context, imageaudittask *dbst return err } -func (m *Mongo) UpdateOverdueImageAuditTasksStatus(ctx *gin.Context, imageaudittask *dbstruct.ImageAuditTask, batchId string) (*qmgo.UpdateResult, error) { +func (m *Mongo) UpdateOverdueImageAuditTasksStatus(ctx *gin.Context, imageaudittask *dbstruct.ImageAuditTask, batchId string, statuses []int64) (*qmgo.UpdateResult, error) { col := m.getColImageAuditTask() set := qmgo.M{ "status": consts.ImageAudit_Expired, @@ -3109,7 +3109,7 @@ func (m *Mongo) UpdateOverdueImageAuditTasksStatus(ctx *gin.Context, imageauditt "$ne": batchId, }, "status": qmgo.M{ - "$in": []int64{consts.ImageAudit_Rollbacked, consts.ImageAudit_ServiceFailed}, + "$in": statuses, }, } result, err := col.UpdateAll(ctx, filter, up) @@ -3327,7 +3327,7 @@ func (m *Mongo) UpdateTextAuditTaskByIds(ctx *gin.Context, textaudittask *dbstru return err } -func (m *Mongo) UpdateOverdueTextAuditTasksStatus(ctx *gin.Context, textaudittask *dbstruct.TextAuditTask, batchId string) (*qmgo.UpdateResult, error) { +func (m *Mongo) UpdateOverdueTextAuditTasksStatus(ctx *gin.Context, textaudittask *dbstruct.TextAuditTask, batchId string, statuses []int64) (*qmgo.UpdateResult, error) { col := m.getColTextAuditTask() set := qmgo.M{ "status": consts.TextAudit_Expired, @@ -3345,7 +3345,7 @@ func (m *Mongo) UpdateOverdueTextAuditTasksStatus(ctx *gin.Context, textaudittas "$ne": batchId, }, "status": qmgo.M{ - "$in": []int64{consts.TextAudit_Rollbacked, consts.TextAudit_ServiceFailed}, + "$in": statuses, }, } result, err := col.UpdateAll(ctx, filter, up) @@ -5226,7 +5226,7 @@ func (m *Mongo) UpdateVideoModerationTaskByIds(ctx *gin.Context, videoModeration return err } -func (m *Mongo) UpdateOverdueVideoModerationTasksStatus(ctx *gin.Context, videoModerationTask *dbstruct.VideoModerationTask, batchId string) (*qmgo.UpdateResult, error) { +func (m *Mongo) UpdateOverdueVideoModerationTasksStatus(ctx *gin.Context, videoModerationTask *dbstruct.VideoModerationTask, batchId string, statuses []int64) (*qmgo.UpdateResult, error) { col := m.getColVideoModerationTask() set := qmgo.M{ "status": consts.VideoModeration_Expired, @@ -5244,7 +5244,7 @@ func (m *Mongo) UpdateOverdueVideoModerationTasksStatus(ctx *gin.Context, videoM "$ne": batchId, }, "status": qmgo.M{ - "$in": []int64{consts.VideoModeration_Rollbacked, consts.VideoModeration_ServiceFailed}, + "$in": statuses, }, } result, err := col.UpdateAll(ctx, filter, up) diff --git a/app/mix/service/imageauditservice.go b/app/mix/service/imageauditservice.go index 045ef1ec..4dd576ce 100644 --- a/app/mix/service/imageauditservice.go +++ b/app/mix/service/imageauditservice.go @@ -68,7 +68,9 @@ func (s *Service) CreateUpdateStreamerImageAudit(ctx *gin.Context, oldStreamer * }) } - addImageAuditTasks(ctx, tasks) + isDistinct := true + options := []any{isDistinct} + addImageAuditTasks(ctx, tasks, options) return } @@ -115,9 +117,9 @@ func (s *Service) CreateZoneMomentImageAudit(ctx *gin.Context, newZoneMoment *db return } -func addImageAuditTasks(ctx *gin.Context, tasks []*dbstruct.ImageAuditTask) error { +func addImageAuditTasks(ctx *gin.Context, tasks []*dbstruct.ImageAuditTask, options ...any) error { for _, task := range tasks { - err := addImageAuditTask(ctx, task) + err := addImageAuditTask(ctx, task, options...) if err != nil { return err } @@ -125,7 +127,8 @@ func addImageAuditTasks(ctx *gin.Context, tasks []*dbstruct.ImageAuditTask) erro return nil } -func addImageAuditTask(ctx *gin.Context, task *dbstruct.ImageAuditTask) error { +// 20240723需求:加入options,用以控制是否保持对处理对象唯一 +func addImageAuditTask(ctx *gin.Context, task *dbstruct.ImageAuditTask, options ...any) error { if task == nil || task.AuditedMedia == nil { return nil } @@ -160,6 +163,22 @@ func addImageAuditTask(ctx *gin.Context, task *dbstruct.ImageAuditTask) error { } } + var ( + isDistinct = false + ) + if len(options) > 0 { + isDistinct = options[0].(bool) + } + + if isDistinct { + // 将对待处理对象所有未完成的处理全部置为失效 + ovderduedStatuses := []int64{consts.ImageAudit_Created, consts.ImageAudit_Passed, consts.ImageAudit_Rejected, consts.ImageAudit_Rollbacked, consts.ImageAudit_ServiceFailed} + if err := _DefaultImageAuditTask.OpHandleOverdue(ctx, task, "", ovderduedStatuses); err != nil { + logger.Error("Imageaudittask OpHandleOverdue failed: %v", err) + return err + } + } + // 写入图像审核任务表 if err := _DefaultImageAuditTask.OpCreate(ctx, &imageaudittaskproto.OpCreateReq{ ImageAuditTask: task, diff --git a/app/mix/service/logic/imageaudittask.go b/app/mix/service/logic/imageaudittask.go index 86429e31..44326651 100644 --- a/app/mix/service/logic/imageaudittask.go +++ b/app/mix/service/logic/imageaudittask.go @@ -84,8 +84,8 @@ func (p *ImageAuditTask) OpUpdateByIds(ctx *gin.Context, req *imageaudittaskprot } // 处理批次号早于参数提供的批次号的审核任务,全部置为已失效 -func (p *ImageAuditTask) OpHandleOverdue(ctx *gin.Context, task *dbstruct.ImageAuditTask, batchId string) error { - result, err := p.store.UpdateOverdueImageAuditTasksStatus(ctx, task, batchId) +func (p *ImageAuditTask) OpHandleOverdue(ctx *gin.Context, task *dbstruct.ImageAuditTask, batchId string, statuses []int64) error { + result, err := p.store.UpdateOverdueImageAuditTasksStatus(ctx, task, batchId, statuses) if err != nil { logger.Error("UpdateOverdueImageAuditTasksStatus fail, err: %v", err) return err diff --git a/app/mix/service/logic/textaudittask.go b/app/mix/service/logic/textaudittask.go index 77a79ae1..879c8bfb 100644 --- a/app/mix/service/logic/textaudittask.go +++ b/app/mix/service/logic/textaudittask.go @@ -84,8 +84,8 @@ func (p *TextAuditTask) OpUpdateByIds(ctx *gin.Context, req *textaudittaskproto. } // 处理批次号早于参数提供的批次号的审核任务,全部置为已失效 -func (p *TextAuditTask) OpHandleOverdue(ctx *gin.Context, task *dbstruct.TextAuditTask, batchId string) error { - result, err := p.store.UpdateOverdueTextAuditTasksStatus(ctx, task, batchId) +func (p *TextAuditTask) OpHandleOverdue(ctx *gin.Context, task *dbstruct.TextAuditTask, batchId string, statuses []int64) error { + result, err := p.store.UpdateOverdueTextAuditTasksStatus(ctx, task, batchId, statuses) if err != nil { logger.Error("UpdateOverdueTextAuditTasksStatus fail, err: %v", err) return err diff --git a/app/mix/service/logic/video_moderation_task.go b/app/mix/service/logic/video_moderation_task.go index c428dae0..40ba0ce7 100644 --- a/app/mix/service/logic/video_moderation_task.go +++ b/app/mix/service/logic/video_moderation_task.go @@ -84,8 +84,8 @@ func (p *VideoModerationTask) OpUpdateByIds(ctx *gin.Context, req *video_moderat } // 处理批次号早于参数提供的批次号的审核任务,全部置为已失效 -func (p *VideoModerationTask) OpHandleOverdue(ctx *gin.Context, task *dbstruct.VideoModerationTask, batchId string) error { - result, err := p.store.UpdateOverdueVideoModerationTasksStatus(ctx, task, batchId) +func (p *VideoModerationTask) OpHandleOverdue(ctx *gin.Context, task *dbstruct.VideoModerationTask, batchId string, statuses []int64) error { + result, err := p.store.UpdateOverdueVideoModerationTasksStatus(ctx, task, batchId, statuses) if err != nil { logger.Error("UpdateOverdueVideoModerationTasksStatus fail, err: %v", err) return err diff --git a/app/mix/service/textauditservice.go b/app/mix/service/textauditservice.go index 46834ae8..ccb4dc48 100644 --- a/app/mix/service/textauditservice.go +++ b/app/mix/service/textauditservice.go @@ -69,7 +69,9 @@ func (s *Service) CreateUpdateStreamerTextAudit(ctx *gin.Context, oldStreamer *d }) } - addTextAuditTasks(ctx, tasks) + isDistinct := true + options := []any{isDistinct} + addTextAuditTasks(ctx, tasks, options) return } @@ -115,9 +117,9 @@ func (s *Service) CreateZoneMomentTextAudit(ctx *gin.Context, newZoneMoment *dbs return } -func addTextAuditTasks(ctx *gin.Context, tasks []*dbstruct.TextAuditTask) error { +func addTextAuditTasks(ctx *gin.Context, tasks []*dbstruct.TextAuditTask, options ...any) error { for _, task := range tasks { - err := addTextAuditTask(ctx, task) + err := addTextAuditTask(ctx, task, options...) if err != nil { return err } @@ -125,7 +127,7 @@ func addTextAuditTasks(ctx *gin.Context, tasks []*dbstruct.TextAuditTask) error return nil } -func addTextAuditTask(ctx *gin.Context, task *dbstruct.TextAuditTask) error { +func addTextAuditTask(ctx *gin.Context, task *dbstruct.TextAuditTask, options ...any) error { if task == nil || task.AuditedText == nil { return nil } @@ -153,6 +155,22 @@ func addTextAuditTask(ctx *gin.Context, task *dbstruct.TextAuditTask) error { task.TextAuditId = textAudit.Id + var ( + isDistinct = false + ) + if len(options) > 0 { + isDistinct = options[0].(bool) + } + + if isDistinct { + // 将对待处理对象所有未完成的处理全部置为失效 + ovderduedStatuses := []int64{consts.TextAudit_Created, consts.TextAudit_Passed, consts.TextAudit_Rejected, consts.TextAudit_Rollbacked, consts.TextAudit_ServiceFailed} + if err := _DefaultTextAuditTask.OpHandleOverdue(ctx, task, "", ovderduedStatuses); err != nil { + logger.Error("Textaudittask OpHandleOverdue failed: %v", err) + return err + } + } + // 2.写入文字审核任务表 if err := _DefaultTextAuditTask.OpCreate(&gin.Context{}, &textaudittaskproto.OpCreateReq{ TextAuditTask: task, diff --git a/app/mix/service/video_moderation_service.go b/app/mix/service/video_moderation_service.go index ff7a26e1..1697a087 100644 --- a/app/mix/service/video_moderation_service.go +++ b/app/mix/service/video_moderation_service.go @@ -28,7 +28,9 @@ func (s *Service) CreateUpdateStreamerVideoModeration(ctx *gin.Context, oldStrea }) } - addVideoModerationTasks(ctx, tasks) + isDistinct := true + options := []any{isDistinct} + addVideoModerationTasks(ctx, tasks, options) return } @@ -75,9 +77,9 @@ func (s *Service) CreateZoneMomentVideoModeration(ctx *gin.Context, newZoneMomen return } -func addVideoModerationTasks(ctx *gin.Context, tasks []*dbstruct.VideoModerationTask) error { +func addVideoModerationTasks(ctx *gin.Context, tasks []*dbstruct.VideoModerationTask, options ...any) error { for _, task := range tasks { - err := addVideoModerationTask(ctx, task) + err := addVideoModerationTask(ctx, task, options...) if err != nil { return err } @@ -85,7 +87,7 @@ func addVideoModerationTasks(ctx *gin.Context, tasks []*dbstruct.VideoModeration return nil } -func addVideoModerationTask(ctx *gin.Context, task *dbstruct.VideoModerationTask) error { +func addVideoModerationTask(ctx *gin.Context, task *dbstruct.VideoModerationTask, options ...any) error { if task == nil || task.AuditedMedia == nil { return nil } @@ -120,6 +122,22 @@ func addVideoModerationTask(ctx *gin.Context, task *dbstruct.VideoModerationTask } } + var ( + isDistinct = false + ) + if len(options) > 0 { + isDistinct = options[0].(bool) + } + + if isDistinct { + // 将对待处理对象所有未完成的处理全部置为失效 + ovderduedStatuses := []int64{consts.VideoModeration_Created, consts.VideoModeration_Passed, consts.VideoModeration_Rejected, consts.VideoModeration_Rollbacked, consts.VideoModeration_ServiceFailed} + if err := _DefaultVideoModerationTask.OpHandleOverdue(ctx, task, "", ovderduedStatuses); err != nil { + logger.Error("Videomoderationtask OpHandleOverdue failed: %v", err) + return err + } + } + // 写入视频审核任务表 if err := _DefaultVideoModerationTask.OpCreate(&gin.Context{}, &video_moderation_task_proto.OpCreateReq{ VideoModerationTask: task, diff --git a/dbstruct/imageaudittask.go b/dbstruct/imageaudittask.go index 62d2e3de..2847bcd7 100644 --- a/dbstruct/imageaudittask.go +++ b/dbstruct/imageaudittask.go @@ -51,6 +51,27 @@ func (p *ImageAuditTask) GetStatus() int64 { return *p.Status } +func (p *ImageAuditTask) GetBatchId() string { + if p == nil || p.BatchId == nil { + return "" + } + return *p.BatchId +} + +func (p *ImageAuditTask) GetAssociativeDatabase() string { + if p == nil || p.AssociativeDatabase == nil { + return "" + } + return *p.AssociativeDatabase +} + +func (p *ImageAuditTask) GetAssociativeTableName() string { + if p == nil || p.AssociativeTableName == nil { + return "" + } + return *p.AssociativeTableName +} + func (p *ImageAuditTask) GetAssociativeTableId() int64 { if p == nil || p.AssociativeTableId == nil { return 0 diff --git a/dbstruct/textaudittask.go b/dbstruct/textaudittask.go index 503f7ed3..66acefbc 100644 --- a/dbstruct/textaudittask.go +++ b/dbstruct/textaudittask.go @@ -47,6 +47,27 @@ func (p *TextAuditTask) GetStatus() int64 { return *p.Status } +func (p *TextAuditTask) GetBatchId() string { + if p == nil || p.BatchId == nil { + return "" + } + return *p.BatchId +} + +func (p *TextAuditTask) GetAssociativeDatabase() string { + if p == nil || p.AssociativeDatabase == nil { + return "" + } + return *p.AssociativeDatabase +} + +func (p *TextAuditTask) GetAssociativeTableName() string { + if p == nil || p.AssociativeTableName == nil { + return "" + } + return *p.AssociativeTableName +} + func (p *TextAuditTask) GetAssociativeTableId() int64 { if p == nil || p.AssociativeTableId == nil { return 0 diff --git a/library/contentaudit/imageaudit/client.go b/library/contentaudit/imageaudit/client.go index 12763802..4afcd34d 100644 --- a/library/contentaudit/imageaudit/client.go +++ b/library/contentaudit/imageaudit/client.go @@ -35,7 +35,7 @@ type ImageAuditTaskService interface { OpList(ctx *gin.Context, req *imageaudittaskproto.OpListReq) ([]*dbstruct.ImageAuditTask, error) OpUpdateByIds(ctx *gin.Context, req *imageaudittaskproto.OpUpdateByIdsReq) error OpUpdateByBatchId(ctx *gin.Context, batchId string, imageaudittask *dbstruct.ImageAuditTask) error - OpHandleOverdue(ctx *gin.Context, task *dbstruct.ImageAuditTask, batchId string) error + OpHandleOverdue(ctx *gin.Context, task *dbstruct.ImageAuditTask, batchId string, statuses []int64) error } type ContentAuditRTIService interface { diff --git a/library/contentaudit/imageaudit/control_block.go b/library/contentaudit/imageaudit/control_block.go index 499def1b..2ec3344d 100644 --- a/library/contentaudit/imageaudit/control_block.go +++ b/library/contentaudit/imageaudit/control_block.go @@ -2,6 +2,7 @@ package imageaudit import ( "fmt" + "service/api/consts" "service/bizcommon/util" "service/dbstruct" "service/library/logger" @@ -54,7 +55,8 @@ func (ctrlBlock *ImageAuditTaskBatchControlBlock) RecordAction(taskCtrlBlock *Im if ctrlBlock.ActionMap[taskCtrlBlock.ActionId] == nil { // 写入ctrlBlock.ActionMap ctrlBlock.ActionMap[taskCtrlBlock.ActionId] = NewImageAuditAction() // 将此前批次对该元素未成功,待处理的审核全部置为已失效 - if err := _DefaultImageAuditTask.OpHandleOverdue(ctx, taskCtrlBlock.ImageAuditTask, util.DerefString(taskCtrlBlock.ImageAuditTask.BatchId)); err != nil { + failStatuses := []int64{consts.ImageAudit_Rollbacked, consts.ImageAudit_ServiceFailed} + if err := _DefaultImageAuditTask.OpHandleOverdue(ctx, taskCtrlBlock.ImageAuditTask, util.DerefString(taskCtrlBlock.ImageAuditTask.BatchId), failStatuses); err != nil { logger.Error("_DefaultImageAuditTask OpHandleOverdue fail :%v", err) } diff --git a/library/contentaudit/textaudit/client.go b/library/contentaudit/textaudit/client.go index 424cd3a0..31e40033 100644 --- a/library/contentaudit/textaudit/client.go +++ b/library/contentaudit/textaudit/client.go @@ -31,7 +31,7 @@ type TextAuditTaskService interface { OpList(ctx *gin.Context, req *textaudittaskproto.OpListReq) ([]*dbstruct.TextAuditTask, error) OpUpdateByIds(ctx *gin.Context, req *textaudittaskproto.OpUpdateByIdsReq) error OpUpdateByBatchId(ctx *gin.Context, batchId string, textaudittask *dbstruct.TextAuditTask) error - OpHandleOverdue(ctx *gin.Context, task *dbstruct.TextAuditTask, batchId string) error + OpHandleOverdue(ctx *gin.Context, task *dbstruct.TextAuditTask, batchId string, statuses []int64) error } type ContentAuditRTIService interface { diff --git a/library/contentaudit/textaudit/control_block.go b/library/contentaudit/textaudit/control_block.go index d7162273..23f7c3c2 100644 --- a/library/contentaudit/textaudit/control_block.go +++ b/library/contentaudit/textaudit/control_block.go @@ -2,6 +2,7 @@ package textaudit import ( "fmt" + "service/api/consts" "service/bizcommon/util" "service/dbstruct" "service/library/logger" @@ -41,7 +42,8 @@ func (ctrlBlock *TextAuditTaskBatchControlBlock) RecordAction(taskCtrlBlock *Tex if ctrlBlock.ActionMap[taskCtrlBlock.ActionId] == nil { // 写入ctrlBlock.ActionMap ctrlBlock.ActionMap[taskCtrlBlock.ActionId] = NewTextAuditAction() // 将此前批次对该元素未成功,待处理的审核全部置为已失效 - if err := _DefaultTextAuditTask.OpHandleOverdue(ctx, taskCtrlBlock.TextAuditTask, util.DerefString(taskCtrlBlock.TextAuditTask.BatchId)); err != nil { + failStatuses := []int64{consts.TextAudit_Rollbacked, consts.TextAudit_ServiceFailed} + if err := _DefaultTextAuditTask.OpHandleOverdue(ctx, taskCtrlBlock.TextAuditTask, util.DerefString(taskCtrlBlock.TextAuditTask.BatchId), failStatuses); err != nil { logger.Error("_DefaultTextAuditTask OpHandleOverdue fail :%v", err) } diff --git a/library/contentaudit/video_moderation/client.go b/library/contentaudit/video_moderation/client.go index d5c186f6..0876be5e 100644 --- a/library/contentaudit/video_moderation/client.go +++ b/library/contentaudit/video_moderation/client.go @@ -33,7 +33,7 @@ type VideoModerationTaskService interface { OpList(ctx *gin.Context, req *video_moderation_task_proto.OpListReq) ([]*dbstruct.VideoModerationTask, error) OpUpdateByIds(ctx *gin.Context, req *video_moderation_task_proto.OpUpdateByIdsReq) error OpUpdateByBatchId(ctx *gin.Context, batchId string, imageaudittask *dbstruct.VideoModerationTask) error - OpHandleOverdue(ctx *gin.Context, task *dbstruct.VideoModerationTask, batchId string) error + OpHandleOverdue(ctx *gin.Context, task *dbstruct.VideoModerationTask, batchId string, statuses []int64) error } type ContentAuditRTIService interface { diff --git a/library/contentaudit/video_moderation/control_block.go b/library/contentaudit/video_moderation/control_block.go index ffc639fa..64a93738 100644 --- a/library/contentaudit/video_moderation/control_block.go +++ b/library/contentaudit/video_moderation/control_block.go @@ -2,6 +2,7 @@ package videomoderation import ( "fmt" + "service/api/consts" "service/dbstruct" "service/library/logger" @@ -25,7 +26,8 @@ func (ctrlBlock *VideoModerationTaskBatchControlBlock) RecordAction(taskCtrlBloc if ctrlBlock.ActionMap[taskCtrlBlock.ActionId] == nil { // 写入ctrlBlock.ActionMap ctrlBlock.ActionMap[taskCtrlBlock.ActionId] = NewVideoModerationAction() // 将此前批次对该元素未成功,待处理的审核全部置为已失效 - if err := _DefaultVideoModerationTask.OpHandleOverdue(ctx, taskCtrlBlock.VideoModerationTask, taskCtrlBlock.VideoModerationTask.GetBatchId()); err != nil { + failStatuses := []int64{consts.VideoModeration_Rollbacked, consts.VideoModeration_ServiceFailed} + if err := _DefaultVideoModerationTask.OpHandleOverdue(ctx, taskCtrlBlock.VideoModerationTask, taskCtrlBlock.VideoModerationTask.GetBatchId(), failStatuses); err != nil { logger.Error("_DefaultVideoModerationTask OpHandleOverdue fail :%v", err) } -- 2.41.0