by Robin at 20240730

This commit is contained in:
Leufolium 2024-07-30 18:40:15 +08:00
parent d9ef34138b
commit 7af7d2d6f1
7 changed files with 241 additions and 97 deletions

View File

@ -2,7 +2,6 @@ package proto
import (
"service/api/base"
"service/bizcommon/util"
"service/dbstruct"
)
@ -75,18 +74,6 @@ type OpListResp struct {
Data *OpListData `json:"data"`
}
func (p *OpListReq) GetQueryArgs() (db string, tn string, status int64, offset int, limit int) {
if p == nil {
return "", "", 0, 0, 0
}
db = util.DerefString(p.AssociativeDatabase)
tn = util.DerefString(p.AssociativeTableName)
status = util.DerefInt64(p.Status)
offset = p.Offset
limit = p.Limit
return
}
// op 批量更新
type OpUpdateByIdsReq struct {
base.BaseRequest
@ -117,27 +104,3 @@ type OpReviewBatchResp struct {
base.BaseResponse
Data *OpReviewBatchData `json:"data"`
}
type OpListByCollectionInfosReq struct {
base.BaseRequest
AssociativeCollections []*CollectionInfo `json:"associative_collections"`
Status *int64 `json:"status"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type OpListByCollectionInfosData struct {
List []*ImageAuditTaskVO `json:"list"`
Offset int `json:"offset"`
More int `json:"more"`
}
type OpListByCollectionInfosResp struct {
base.BaseResponse
Data *OpListByCollectionInfosData `json:"data"`
}
type CollectionInfo struct {
AssociativeDatabase string `json:"associative_data_base"`
AssociativeTableName string `json:"associative_table_name"`
}

View File

@ -3479,29 +3479,31 @@ func (m *Mongo) GetImageAuditTaskList(ctx *gin.Context, req *imageaudittaskproto
return list, err
}
func (m *Mongo) GetImageAuditTaskListByCollectionInfos(ctx *gin.Context, req *imageaudittaskproto.OpListByCollectionInfosReq) ([]*dbstruct.ImageAuditTask, error) {
func (m *Mongo) GetImageAuditTaskListByCollectionInfos(ctx *gin.Context, databases []string, tableNames []string, status int64, offset, limit int) ([]*dbstruct.ImageAuditTask, error) {
list := make([]*dbstruct.ImageAuditTask, 0)
if len(databases) != len(tableNames) {
return list, fmt.Errorf("length of databases is not equal to length of tableNames")
}
col := m.getColImageAuditTask()
query := qmgo.M{
"is_aligned": qmgo.M{
"$ne": consts.ImageAuditIsAligned_Yes,
},
"status": status,
"del_flag": 0,
}
if len(req.AssociativeCollections) > 0 {
orClause := make([]qmgo.M, 0)
for _, collectionInfo := range req.AssociativeCollections {
orClause = append(orClause, qmgo.M{
"associative_database": collectionInfo.AssociativeDatabase,
"associative_table_name": collectionInfo.AssociativeTableName,
})
}
query["$or"] = orClause
orClause := make([]qmgo.M, 0)
for i := range databases {
orClause = append(orClause, qmgo.M{
"associative_database": databases[i],
"associative_table_name": tableNames[i],
})
}
if req.Status != nil {
query["status"] = util.DerefInt64(req.Status)
}
err := col.Find(ctx, query).Sort("-ct").Skip(int64(req.Offset)).Limit(int64(req.Limit)).All(&list)
query["$or"] = orClause
err := col.Find(ctx, query).Sort("-ct").Skip(int64(offset)).Limit(int64(limit)).All(&list)
if err == qmgo.ErrNoSuchDocuments {
err = nil
return list, err
@ -3745,6 +3747,38 @@ func (m *Mongo) GetTextAuditTaskList(ctx *gin.Context, req *textaudittaskproto.O
return list, err
}
func (m *Mongo) GetTextAuditTaskListByCollectionInfos(ctx *gin.Context, databases []string, tableNames []string, status int64, offset, limit int) ([]*dbstruct.TextAuditTask, error) {
list := make([]*dbstruct.TextAuditTask, 0)
if len(databases) != len(tableNames) {
return list, fmt.Errorf("length of databases is not equal to length of tableNames")
}
col := m.getColTextAuditTask()
query := qmgo.M{
"is_aligned": qmgo.M{
"$ne": consts.TextAuditIsAligned_Yes,
},
"status": status,
"del_flag": 0,
}
orClause := make([]qmgo.M, 0)
for i := range databases {
orClause = append(orClause, qmgo.M{
"associative_database": databases[i],
"associative_table_name": tableNames[i],
})
}
query["$or"] = orClause
err := col.Find(ctx, query).Sort("-ct").Skip(int64(offset)).Limit(int64(limit)).All(&list)
if err == qmgo.ErrNoSuchDocuments {
err = nil
return list, err
}
return list, err
}
func (m *Mongo) GetTextAuditTaskListByIds(ctx *gin.Context, ids []string) ([]*dbstruct.TextAuditTask, error) {
list := make([]*dbstruct.TextAuditTask, 0)
col := m.getColTextAuditTask()
@ -5644,6 +5678,38 @@ func (m *Mongo) GetVideoModerationTaskList(ctx *gin.Context, req *video_moderati
return list, err
}
func (m *Mongo) GetVideoModerationTaskListByCollectionInfos(ctx *gin.Context, databases []string, tableNames []string, status int64, offset, limit int) ([]*dbstruct.VideoModerationTask, error) {
list := make([]*dbstruct.VideoModerationTask, 0)
if len(databases) != len(tableNames) {
return list, fmt.Errorf("length of databases is not equal to length of tableNames")
}
col := m.getColVideoModerationTask()
query := qmgo.M{
"is_aligned": qmgo.M{
"$ne": consts.VideoModerationIsAligned_Yes,
},
"status": status,
"del_flag": 0,
}
orClause := make([]qmgo.M, 0)
for i := range databases {
orClause = append(orClause, qmgo.M{
"associative_database": databases[i],
"associative_table_name": tableNames[i],
})
}
query["$or"] = orClause
err := col.Find(ctx, query).Sort("-ct").Skip(int64(offset)).Limit(int64(limit)).All(&list)
if err == qmgo.ErrNoSuchDocuments {
err = nil
return list, err
}
return list, err
}
func (m *Mongo) GetVideoModerationTaskListByIds(ctx *gin.Context, ids []string) ([]*dbstruct.VideoModerationTask, error) {
list := make([]*dbstruct.VideoModerationTask, 0)
col := m.getColVideoModerationTask()

View File

@ -1,4 +1,133 @@
package logic
type ContentAuditTaskService interface {
import (
"fmt"
"service/api/consts"
imageaudittaskproto "service/api/proto/imageaudittask/proto"
textaudittaskproto "service/api/proto/textaudittask/proto"
videomoderationtaskproto "service/api/proto/video_moderation_task/proto"
"service/apollostruct"
"service/bizcommon/util"
"service/dbstruct"
"service/library/apollo"
"service/library/logger"
"github.com/gin-gonic/gin"
)
type ImageAuditTaskDecorator struct {
ImageAuditTask *ImageAuditTask
}
func (p *ImageAuditTaskDecorator) OpList(ctx *gin.Context, req *imageaudittaskproto.OpListReq) ([]*dbstruct.ImageAuditTask, error) {
// 获取映射表
cfg := apollostruct.AuditTaskCollectionReflectCfg{}
err := apollo.GetJson(consts.AuditTaskCollectionReflectKey, &cfg, apollo.ApolloOpts().SetNamespace("application"))
if err != nil {
logger.Error("Apollo read failed : %v", err)
return make([]*dbstruct.ImageAuditTask, 0), err
}
// 将查询的集合信息进行映射,决定查询方式
key := fmt.Sprintf("%v|%v", util.DerefString(req.AssociativeDatabase), util.DerefString(req.AssociativeTableName))
collectionInfos, ok := cfg.Map[key]
var queryFunc func(ctx *gin.Context, req *imageaudittaskproto.OpListReq) ([]*dbstruct.ImageAuditTask, error)
if !ok || len(collectionInfos) == 0 {
queryFunc = func(ctx *gin.Context, req *imageaudittaskproto.OpListReq) ([]*dbstruct.ImageAuditTask, error) {
// 仅查询未对齐的任务
req.NotAlignedOpt = 1
return p.ImageAuditTask.OpList(ctx, req)
}
} else {
queryFunc = func(ctx *gin.Context, req *imageaudittaskproto.OpListReq) ([]*dbstruct.ImageAuditTask, error) {
databases := make([]string, 0)
tableNames := make([]string, 0)
for _, collectionInfo := range collectionInfos {
databases = append(databases, collectionInfo.Database)
tableNames = append(tableNames, collectionInfo.TableName)
}
return p.ImageAuditTask.OpListByCollectionInfos(ctx, databases, tableNames, util.DerefInt64(req.Status), req.Offset, req.Limit)
}
}
return queryFunc(ctx, req)
}
type TextAuditTaskDecorator struct {
TextAuditTask *TextAuditTask
}
func (p *TextAuditTaskDecorator) OpList(ctx *gin.Context, req *textaudittaskproto.OpListReq) ([]*dbstruct.TextAuditTask, error) {
// 获取映射表
cfg := apollostruct.AuditTaskCollectionReflectCfg{}
err := apollo.GetJson(consts.AuditTaskCollectionReflectKey, &cfg, apollo.ApolloOpts().SetNamespace("application"))
if err != nil {
logger.Error("Apollo read failed : %v", err)
return make([]*dbstruct.TextAuditTask, 0), err
}
// 将查询的集合信息进行映射,决定查询方式
key := fmt.Sprintf("%v|%v", util.DerefString(req.AssociativeDatabase), util.DerefString(req.AssociativeTableName))
collectionInfos, ok := cfg.Map[key]
var queryFunc func(ctx *gin.Context, req *textaudittaskproto.OpListReq) ([]*dbstruct.TextAuditTask, error)
if !ok || len(collectionInfos) == 0 {
queryFunc = func(ctx *gin.Context, req *textaudittaskproto.OpListReq) ([]*dbstruct.TextAuditTask, error) {
// 仅查询未对齐的任务
req.NotAlignedOpt = 1
return p.TextAuditTask.OpList(ctx, req)
}
} else {
queryFunc = func(ctx *gin.Context, req *textaudittaskproto.OpListReq) ([]*dbstruct.TextAuditTask, error) {
databases := make([]string, 0)
tableNames := make([]string, 0)
for _, collectionInfo := range collectionInfos {
databases = append(databases, collectionInfo.Database)
tableNames = append(tableNames, collectionInfo.TableName)
}
return p.TextAuditTask.OpListByCollectionInfos(ctx, databases, tableNames, util.DerefInt64(req.Status), req.Offset, req.Limit)
}
}
return queryFunc(ctx, req)
}
type VideoModerationTaskDecorator struct {
VideoModerationTask *VideoModerationTask
}
func (p *VideoModerationTaskDecorator) OpList(ctx *gin.Context, req *videomoderationtaskproto.OpListReq) ([]*dbstruct.VideoModerationTask, error) {
// 获取映射表
cfg := apollostruct.AuditTaskCollectionReflectCfg{}
err := apollo.GetJson(consts.AuditTaskCollectionReflectKey, &cfg, apollo.ApolloOpts().SetNamespace("application"))
if err != nil {
logger.Error("Apollo read failed : %v", err)
return make([]*dbstruct.VideoModerationTask, 0), err
}
// 将查询的集合信息进行映射,决定查询方式
key := fmt.Sprintf("%v|%v", util.DerefString(req.AssociativeDatabase), util.DerefString(req.AssociativeTableName))
collectionInfos, ok := cfg.Map[key]
var queryFunc func(ctx *gin.Context, req *videomoderationtaskproto.OpListReq) ([]*dbstruct.VideoModerationTask, error)
if !ok || len(collectionInfos) == 0 {
queryFunc = func(ctx *gin.Context, req *videomoderationtaskproto.OpListReq) ([]*dbstruct.VideoModerationTask, error) {
// 仅查询未对齐的任务
req.NotAlignedOpt = 1
return p.VideoModerationTask.OpList(ctx, req)
}
} else {
queryFunc = func(ctx *gin.Context, req *videomoderationtaskproto.OpListReq) ([]*dbstruct.VideoModerationTask, error) {
databases := make([]string, 0)
tableNames := make([]string, 0)
for _, collectionInfo := range collectionInfos {
databases = append(databases, collectionInfo.Database)
tableNames = append(tableNames, collectionInfo.TableName)
}
return p.VideoModerationTask.OpListByCollectionInfos(ctx, databases, tableNames, util.DerefInt64(req.Status), req.Offset, req.Limit)
}
}
return queryFunc(ctx, req)
}

View File

@ -65,8 +65,8 @@ func (p *ImageAuditTask) OpList(ctx *gin.Context, req *imageaudittaskproto.OpLis
return list, nil
}
func (p *ImageAuditTask) OpListByCollectionInfos(ctx *gin.Context, req *imageaudittaskproto.OpListByCollectionInfosReq) ([]*dbstruct.ImageAuditTask, error) {
list, err := p.store.GetImageAuditTaskListByCollectionInfos(ctx, req)
func (p *ImageAuditTask) OpListByCollectionInfos(ctx *gin.Context, databases []string, tableNames []string, status int64, offset, limit int) ([]*dbstruct.ImageAuditTask, error) {
list, err := p.store.GetImageAuditTaskListByCollectionInfos(ctx, databases, tableNames, status, offset, limit)
if err != nil {
logger.Error("GetImageAuditTaskListByCollectionInfos fail, err: %v", err)
return make([]*dbstruct.ImageAuditTask, 0), err

View File

@ -65,6 +65,15 @@ func (p *TextAuditTask) OpList(ctx *gin.Context, req *textaudittaskproto.OpListR
return list, nil
}
func (p *TextAuditTask) OpListByCollectionInfos(ctx *gin.Context, databases []string, tableNames []string, status int64, offset, limit int) ([]*dbstruct.TextAuditTask, error) {
list, err := p.store.GetTextAuditTaskListByCollectionInfos(ctx, databases, tableNames, status, offset, limit)
if err != nil {
logger.Error("GetTextAuditTaskListByCollectionInfos fail, err: %v", err)
return make([]*dbstruct.TextAuditTask, 0), err
}
return list, nil
}
func (p *TextAuditTask) OpUpdateByBatchId(ctx *gin.Context, batchId string, textaudittask *dbstruct.TextAuditTask) error {
err := p.store.UpdateTextAuditTaskByBatchId(ctx, batchId, textaudittask)
if err != nil {

View File

@ -65,6 +65,15 @@ func (p *VideoModerationTask) OpList(ctx *gin.Context, req *video_moderation_tas
return list, nil
}
func (p *VideoModerationTask) OpListByCollectionInfos(ctx *gin.Context, databases []string, tableNames []string, status int64, offset, limit int) ([]*dbstruct.VideoModerationTask, error) {
list, err := p.store.GetVideoModerationTaskListByCollectionInfos(ctx, databases, tableNames, status, offset, limit)
if err != nil {
logger.Error("GetVideoModerationTaskListByCollectionInfos fail, err: %v", err)
return make([]*dbstruct.VideoModerationTask, 0), err
}
return list, nil
}
func (p *VideoModerationTask) OpUpdateByBatchId(ctx *gin.Context, batchId string, video_moderation_task *dbstruct.VideoModerationTask) error {
err := p.store.UpdateVideoModerationTaskByBatchId(ctx, batchId, video_moderation_task)
if err != nil {

View File

@ -3161,44 +3161,10 @@ func (s *Service) OpGetImageAuditTaskVOList(ctx *gin.Context, req *imageaudittas
return
}
// 获取映射表
cfg := apollostruct.AuditTaskCollectionReflectCfg{}
err := apollo.GetJson(consts.AuditTaskCollectionReflectKey, &cfg, apollo.ApolloOpts().SetNamespace("application"))
if err != nil {
logger.Error("Apollo read failed : %v", err)
return make([]*imageaudittaskproto.ImageAuditTaskVO, 0), errcode.ErrCodeApolloReadFail
imageAuditTaskProxy := &logic.ImageAuditTaskDecorator{
ImageAuditTask: _DefaultImageAuditTask,
}
// 将查询的集合信息进行映射,决定查询方式
key := fmt.Sprintf("%v|%v", util.DerefString(req.AssociativeDatabase), util.DerefString(req.AssociativeTableName))
collectionInfos, ok := cfg.Map[key]
var queryFunc func(ctx *gin.Context, req *imageaudittaskproto.OpListReq) ([]*dbstruct.ImageAuditTask, error)
if !ok || len(collectionInfos) == 0 {
queryFunc = func(ctx *gin.Context, req *imageaudittaskproto.OpListReq) ([]*dbstruct.ImageAuditTask, error) {
// 仅查询未对齐的任务
req.NotAlignedOpt = 1
return _DefaultImageAuditTask.OpList(ctx, req)
}
} else {
queryFunc = func(ctx *gin.Context, req *imageaudittaskproto.OpListReq) ([]*dbstruct.ImageAuditTask, error) {
associativeCollections := make([]*imageaudittaskproto.CollectionInfo, 0)
for _, collectionInfo := range collectionInfos {
associativeCollections = append(associativeCollections, &imageaudittaskproto.CollectionInfo{
AssociativeDatabase: collectionInfo.Database,
AssociativeTableName: collectionInfo.TableName,
})
}
return _DefaultImageAuditTask.OpListByCollectionInfos(ctx, &imageaudittaskproto.OpListByCollectionInfosReq{
BaseRequest: req.BaseRequest,
AssociativeCollections: associativeCollections,
Status: req.Status,
Offset: req.Offset,
Limit: req.Limit,
})
}
}
list, err := queryFunc(ctx, req)
list, err := imageAuditTaskProxy.OpList(ctx, req)
if err != nil {
logger.Error("OpGetImageAuditTaskListByMid fail, req: %v, err: %v", util.ToJson(req), err)
ec = errcode.ErrCodeImageAuditTaskSrvFail
@ -3287,10 +3253,10 @@ func (s *Service) OpGetTextAuditTaskVOList(ctx *gin.Context, req *textaudittaskp
return
}
// 仅查询未对齐的任务
req.NotAlignedOpt = 1
list, err := _DefaultTextAuditTask.OpList(ctx, req)
textAuditTaskProxy := &logic.TextAuditTaskDecorator{
TextAuditTask: _DefaultTextAuditTask,
}
list, err := textAuditTaskProxy.OpList(ctx, req)
if err != nil {
logger.Error("OpGetTextAuditTaskListByMid fail, req: %v, err: %v", util.ToJson(req), err)
ec = errcode.ErrCodeTextAuditTaskSrvFail
@ -3376,9 +3342,11 @@ func (s *Service) OpGetVideoModerationTaskVOList(ctx *gin.Context, req *videomod
ec = errcode.ErrCodeVideoModerationTaskSrvOk
// 仅查询未对齐的任务
req.NotAlignedOpt = 1
videoModerationTaskProxy := &logic.VideoModerationTaskDecorator{
VideoModerationTask: _DefaultVideoModerationTask,
}
list, err := _DefaultVideoModerationTask.OpList(ctx, req)
list, err := videoModerationTaskProxy.OpList(ctx, req)
if err != nil {
logger.Error("OpGetVideoModerationTaskListByMid fail, req: %v, err: %v", util.ToJson(req), err)
ec = errcode.ErrCodeVideoModerationTaskSrvFail