diff --git a/app/mix/cmd/main.go b/app/mix/cmd/main.go index 4501b933..40a36a5d 100644 --- a/app/mix/cmd/main.go +++ b/app/mix/cmd/main.go @@ -9,13 +9,9 @@ import ( "service/api/consts" "service/app/mix/conf" "service/app/mix/controller" - "service/app/mix/mediafiller" "service/app/mix/service" "service/library/apollo" "service/library/configcenter" - "service/library/contentaudit/imageaudit" - "service/library/contentaudit/textaudit" - videomoderation "service/library/contentaudit/video_moderation" "service/library/dingtalk" "service/library/httpengine" "service/library/httpserver" @@ -85,7 +81,6 @@ func main() { // 初始化服务 service.DefaultService = service.NewService() service.DefaultConfigService = service.NewConfigService() - service.DefaultCronService = service.NewCronService() service.DefaultScriptsService = service.NewScriptsService() service.DefaultImageAuditTaskResultHandler = service.NewImageAuditTaskResultHandler() service.DefaultTextAuditTaskResultHandler = service.NewTextAuditTaskResultHandler() @@ -95,29 +90,6 @@ func main() { msg := fmt.Sprintf("Service init fail, err: %v", err) PrintAndExit(msg) } - err = service.DefaultCronService.Init(cfg) - if err != nil { - msg := fmt.Sprintf("CronService init fail, err: %v", err) - PrintAndExit(msg) - } - - // 连接到审核任务数据库接口 - service.DefaultService.ConnectToImageAudit() - service.DefaultService.ConnectToTextAudit() - service.DefaultService.ConnectToVideoModeration() - //service.DefaultService.Run() - - // 启动图像审核服务 - imageaudit.Init(cfg.ImageAudit) - - // 启动文字审核服务 - textaudit.Init(cfg.TextAudit) - - // 启动视频审核服务 - videomoderation.Init(cfg.VideoModeration) - - // 初始化媒体填充服务 - mediafiller.Init(cfg.ServerInfo) // 初始化http server ip := GetIp() diff --git a/app/mix/cmd_offline/main.go b/app/mix/cmd_offline/main.go new file mode 100644 index 00000000..41f2afb3 --- /dev/null +++ b/app/mix/cmd_offline/main.go @@ -0,0 +1,133 @@ +package main + +import ( + "fmt" + "net" + "os" + "runtime" + "service/api/consts" + "service/app/mix/conf" + "service/app/mix/mediafiller" + "service/app/mix/service" + "service/library/apollo" + "service/library/configcenter" + "service/library/contentaudit/imageaudit" + "service/library/contentaudit/textaudit" + videomoderation "service/library/contentaudit/video_moderation" + "service/library/dingtalk" + "service/library/logger" + "service/library/redis" + "service/library/sms" + "strings" + "time" +) + +func main() { + // 加载配置 + // 1.默认配置 + configPath := consts.MainConfigPath + + //if os.Getenv("PROJECT_ENV") == "production" { + // configPath = consts.ProductionConfigPath + //} + cfg := new(conf.ConfigSt) + err := configcenter.LoadConfig(configPath, cfg) + if err != nil { + msg := fmt.Sprintf("LoadConfig fail, path: %v, err: %v", configPath, err) + PrintAndExit(msg) + } + + //初始化apollo + err = apollo.Init(cfg.Apollo) + if err != nil { + msg := fmt.Sprintf("Apollo init fail, path: %v, err: %v", configPath, err) + PrintAndExit(msg) + } + + //初始化redis + err = redis.Init(cfg.RedisConfig) + if err != nil { + fmt.Printf("Redis init fail, path: %v, err: %v", configPath, err) + return + } + + // 初始化日志 + logger.InitLogger(cfg.Log) + + // 初始化短信服务 + err = sms.Init(cfg.Dysmsapi) + if err != nil { + msg := fmt.Sprintf("Sms init fail, path: %v, err: %v", configPath, err) + PrintAndExit(msg) + } + + // 初始化钉钉机器人 + dingtalk.InitDefaultDingTalkClient(cfg.DingTalkRobot) + + // 初始化服务 + service.DefaultService = service.NewService() + service.DefaultConfigService = service.NewConfigService() + service.DefaultCronService = service.NewCronService() + service.DefaultScriptsService = service.NewScriptsService() + service.DefaultImageAuditTaskResultHandler = service.NewImageAuditTaskResultHandler() + service.DefaultTextAuditTaskResultHandler = service.NewTextAuditTaskResultHandler() + service.DefaultVideoModerationTaskResultHandler = service.NewVideoModerationTaskResultHandler() + err = service.DefaultService.Init(cfg) + if err != nil { + msg := fmt.Sprintf("Service init fail, err: %v", err) + PrintAndExit(msg) + } + exec, err := service.DefaultCronService.Init(cfg) + if err != nil { + msg := fmt.Sprintf("CronService init fail, err: %v", err) + PrintAndExit(msg) + } + + // 连接到审核任务数据库接口 + service.DefaultService.ConnectToImageAudit() + service.DefaultService.ConnectToTextAudit() + service.DefaultService.ConnectToVideoModeration() + + // 启动图像审核服务 + imageaudit.Init(cfg.ImageAudit) + + // 启动文字审核服务 + textaudit.Init(cfg.TextAudit) + + // 启动视频审核服务 + videomoderation.Init(cfg.VideoModeration) + + // 初始化媒体填充服务 + mediafiller.Init(cfg.ServerInfo) + + exec.Run() +} + +func PrintAndExit(msg string) { + _, file, line, _ := runtime.Caller(1) + _, _ = fmt.Fprintf(os.Stderr, "file %s, line %d, %s\n", file, line, msg) + time.Sleep(1) //wait logger flush + os.Exit(1) +} + +func GetIp() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + PrintAndExit("get ip fail") + return "127.0.0.1" + } + retIp := "" + for _, address := range addrs { + // 检查ip地址判断是否回环地址 + if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + ip := ipnet.IP.String() + if strings.HasPrefix(ip, "172.") { + retIp = ip + break + } + } + } + } + return retIp +} diff --git a/app/mix/dao/mongo_idseq.go b/app/mix/dao/mongo_idseq.go index b282eb26..ab01dc16 100644 --- a/app/mix/dao/mongo_idseq.go +++ b/app/mix/dao/mongo_idseq.go @@ -57,6 +57,11 @@ const ( DBZoneCollaboratorIdSeq = "zone_collaborator_id_seq" COLZoneCollaboratorIdSeq = "zone_collaborator_id_seq" + + DBContentAuditRTI = "content_audit_rti" + COLImageAuditRTI = "image_audit_rti" + COLTextAuditRTI = "text_audit_rti" + COLVideoModerationRTI = "video_moderation_rti" ) // UserIdSeq序列表 @@ -139,6 +144,21 @@ func (m *Mongo) getColZoneCollaboratorIdSeq() *qmgo.Collection { return m.clientMix.Database(DBZoneCollaboratorIdSeq).Collection(COLZoneCollaboratorIdSeq) } +// 图像审核任务批次号表 +func (m *Mongo) getColImageAuditRTI() *qmgo.Collection { + return m.clientMix.Database(DBContentAuditRTI).Collection(COLImageAuditRTI) +} + +// 文字审核任务批次号表 +func (m *Mongo) getColTextAuditRTI() *qmgo.Collection { + return m.clientMix.Database(DBContentAuditRTI).Collection(COLTextAuditRTI) +} + +// 视频审核任务批次号表 +func (m *Mongo) getColVideoModerationRTI() *qmgo.Collection { + return m.clientMix.Database(DBContentAuditRTI).Collection(COLVideoModerationRTI) +} + // account_id发号器 func (m *Mongo) GetAndUpdateAccountIdSeq(ctx *gin.Context) (accountIdSeq *dbstruct.AccountIdSeq, err error) { col := m.getColAccountIdSeq() @@ -453,3 +473,99 @@ func (m *Mongo) GetAndUpdateZoneCollaboratorIdSeq(ctx *gin.Context) (zoneCollabo return &zoneCollaboratorIdSeqInstance, err } + +// 图像审核任务批次号 +func (m *Mongo) GetAndUpdateImageAuditBatchId(ctx *gin.Context, batchId string) (imageAuditBatchId *dbstruct.ImageAuditBatchId, err error) { + col := m.getColImageAuditRTI() + + change := qmgo.Change{ + Update: qmgo.M{"$set": qmgo.M{"batch_id": batchId}}, + Upsert: true, + ReturnNew: false, + } + + imageAuditBatchIdInstance := dbstruct.ImageAuditBatchId{} + if err = col.Find(ctx, qmgo.M{"_id": "batch_id"}).Apply(change, &imageAuditBatchIdInstance); err != nil { + logger.Error("change error : %v", err) + return + } + + return &imageAuditBatchIdInstance, err +} + +// 文字审核任务批次号 +func (m *Mongo) GetAndUpdateTextAuditBatchId(ctx *gin.Context, batchId string) (textAuditBatchId *dbstruct.TextAuditBatchId, err error) { + col := m.getColTextAuditRTI() + + change := qmgo.Change{ + Update: qmgo.M{"$set": qmgo.M{"batch_id": batchId}}, + Upsert: true, + ReturnNew: false, + } + + textAuditBatchIdInstance := dbstruct.TextAuditBatchId{} + if err = col.Find(ctx, qmgo.M{"_id": "batch_id"}).Apply(change, &textAuditBatchIdInstance); err != nil { + logger.Error("change error : %v", err) + return + } + + return &textAuditBatchIdInstance, err +} + +// 视频审核任务批次号 +func (m *Mongo) GetAndUpdateVideoModerationBatchId(ctx *gin.Context, batchId string) (videoModerationBatchId *dbstruct.VideoModerationBatchId, err error) { + col := m.getColVideoModerationRTI() + + change := qmgo.Change{ + Update: qmgo.M{"$set": qmgo.M{"batch_id": batchId}}, + Upsert: true, + ReturnNew: false, + } + + videoModerationBatchIdInstance := dbstruct.VideoModerationBatchId{} + if err = col.Find(ctx, qmgo.M{"_id": "batch_id"}).Apply(change, &videoModerationBatchIdInstance); err != nil { + logger.Error("change error : %v", err) + return + } + + return &videoModerationBatchIdInstance, err +} + +// 图像审核任务批次号 +func (m *Mongo) GetImageAuditBatchId(ctx *gin.Context) (imageAuditBatchId *dbstruct.ImageAuditBatchId, err error) { + col := m.getColImageAuditRTI() + + imageAuditBatchId = &dbstruct.ImageAuditBatchId{} + if err = col.Find(ctx, qmgo.M{"_id": "batch_id"}).One(imageAuditBatchId); err != nil { + logger.Error("find error : %v", err) + return + } + + return +} + +// 文字审核任务批次号 +func (m *Mongo) GetTextAuditBatchId(ctx *gin.Context) (textAuditBatchId *dbstruct.TextAuditBatchId, err error) { + col := m.getColTextAuditRTI() + + textAuditBatchId = &dbstruct.TextAuditBatchId{} + if err = col.Find(ctx, qmgo.M{"_id": "batch_id"}).One(&textAuditBatchId); err != nil { + logger.Error("find error : %v", err) + return + } + + return +} + +// 视频审核任务批次号 +func (m *Mongo) GetVideoModerationBatchId(ctx *gin.Context) (videoModerationBatchId *dbstruct.VideoModerationBatchId, err error) { + col := m.getColVideoModerationRTI() + + videoModerationBatchId = &dbstruct.VideoModerationBatchId{} + if err = col.Find(ctx, qmgo.M{"_id": "batch_id"}).One(&videoModerationBatchId); err != nil { + logger.Error("find error : %v", err) + return + } + + return +} diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index e6d96b0b..67f9fb2d 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -36,9 +36,6 @@ import ( "service/bizcommon/util" "service/dbstruct" "service/library/apollo" - "service/library/contentaudit/imageaudit" - "service/library/contentaudit/textaudit" - videomoderation "service/library/contentaudit/video_moderation" "service/library/logger" "time" @@ -348,8 +345,8 @@ func (s *Service) ApiUpdateAccount(ctx *gin.Context, req *accountproto.ApiUpdate } // 创建审核任务 - imageaudittasks := s.CreateUpdateAccountImageAudit(ctx, oldAccount, req.Account) - textaudittasks := s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account) + s.CreateUpdateAccountImageAudit(ctx, oldAccount, req.Account) + s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account) // 用户只允许修改昵称和头像 if err := _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{ @@ -375,10 +372,6 @@ func (s *Service) ApiUpdateAccount(ctx *gin.Context, req *accountproto.ApiUpdate } } - // 审核任务加入队列 - imageaudit.AddTasks(imageaudittasks) - textaudit.AddTasks(textaudittasks) - return } @@ -1033,8 +1026,8 @@ func (s *Service) ApiUpdateStreamer(ctx *gin.Context, req *streamerproto.ApiUpda } // 创建审核任务 - imageaudittasks := s.CreateUpdateStreamerImageAudit(ctx, oldStreamer, req.Streamer) - textaudittasks := s.CreateUpdateStreamerTextAudit(ctx, oldStreamer, req.Streamer) + s.CreateUpdateStreamerImageAudit(ctx, oldStreamer, req.Streamer) + s.CreateUpdateStreamerTextAudit(ctx, oldStreamer, req.Streamer) //执行更新 if err := _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{ @@ -1073,9 +1066,6 @@ func (s *Service) ApiUpdateStreamer(ctx *gin.Context, req *streamerproto.ApiUpda return } - // 审核任务加入队列 - imageaudit.AddTasks(imageaudittasks) - textaudit.AddTasks(textaudittasks) return } @@ -1687,8 +1677,6 @@ func (s *Service) ApiCreateMoment(ctx *gin.Context, req *momentproto.ApiCreateRe // 添加审核任务 imageaudittasks := s.CreateMomentImageAudit(ctx, req.Moment) textaudittasks := s.CreateMomentTextAudit(ctx, oldMoment, req.Moment) - imageaudit.AddTasks(imageaudittasks) - textaudit.AddTasks(textaudittasks) // 封装动态审核任务 momentAuditTask := &dbstruct.MomentAuditTask{ @@ -1762,8 +1750,6 @@ func (s *Service) ApiUpdateMoment(ctx *gin.Context, req *momentproto.ApiUpdateRe // 添加审核任务 imageaudittasks := s.CreateMomentImageAudit(ctx, req.Moment) textaudittasks := s.CreateMomentTextAudit(ctx, oldMoment, req.Moment) - imageaudit.AddTasks(imageaudittasks) - textaudit.AddTasks(textaudittasks) // 封装动态审核任务 momentAuditTask := &dbstruct.MomentAuditTask{ @@ -2480,12 +2466,9 @@ func (s *Service) ApiCreateZoneMoment(ctx *gin.Context, req *zonemomentproto.Api } // 添加审核任务 - imageaudittasks := s.CreateZoneMomentImageAudit(ctx, req.ZoneMoment) - textaudittasks := s.CreateZoneMomentTextAudit(ctx, req.ZoneMoment) - videomoderationtasks := s.CreateZoneMomentVideoModeration(ctx, req.ZoneMoment) - imageaudit.AddTasks(imageaudittasks) - textaudit.AddTasks(textaudittasks) - videomoderation.AddTasks(videomoderationtasks) + s.CreateZoneMomentImageAudit(ctx, req.ZoneMoment) + s.CreateZoneMomentTextAudit(ctx, req.ZoneMoment) + s.CreateZoneMomentVideoModeration(ctx, req.ZoneMoment) // 设置价格 if req.ZoneMoment.GetCType() == consts.ZoneMomentCType_Paid { @@ -2587,12 +2570,9 @@ func (s *Service) ApiUpdateZoneMoment(ctx *gin.Context, req *zonemomentproto.Api } // 添加审核任务 - imageaudittasks := s.CreateZoneMomentImageAudit(ctx, req.ZoneMoment) - textaudittasks := s.CreateZoneMomentTextAudit(ctx, req.ZoneMoment) - videomoderationtasks := s.CreateZoneMomentVideoModeration(ctx, req.ZoneMoment) - imageaudit.AddTasks(imageaudittasks) - textaudit.AddTasks(textaudittasks) - videomoderation.AddTasks(videomoderationtasks) + s.CreateZoneMomentImageAudit(ctx, req.ZoneMoment) + s.CreateZoneMomentTextAudit(ctx, req.ZoneMoment) + s.CreateZoneMomentVideoModeration(ctx, req.ZoneMoment) // 设置价格 if req.ZoneMoment.GetCType() == consts.ZoneMomentCType_Paid { diff --git a/app/mix/service/cronservice.go b/app/mix/service/cronservice.go index 036afb4c..47da5039 100644 --- a/app/mix/service/cronservice.go +++ b/app/mix/service/cronservice.go @@ -23,7 +23,7 @@ func NewCronService() *CronService { return new(CronService) } -func (s *CronService) Init(c any) (err error) { +func (s *CronService) Init(c any) (exec xxl.Executor, err error) { cfg, ok := c.(*conf.ConfigSt) if !ok { err = errors.New("cfg struct type not expected") @@ -33,7 +33,7 @@ func (s *CronService) Init(c any) (err error) { s.fileAbsPath = cfg.Log.FileAbsPath - exec := xxl.NewExecutor( + exec = xxl.NewExecutor( xxl.ServerAddr(cfg.XxlJob.ServerAddr), //xxl-job-admin部署地址 xxl.AccessToken(cfg.XxlJob.AccessToken), //请求令牌(默认为空) xxl.ExecutorIp(cfg.XxlJob.ExecutorIp), //可自动获取 @@ -65,7 +65,6 @@ func (s *CronService) Init(c any) (err error) { exec.LogHandler(customLogHandle) //注册任务handler - go exec.Run() return } diff --git a/app/mix/service/imageauditservice.go b/app/mix/service/imageauditservice.go index 15ff841a..0f4d69c0 100644 --- a/app/mix/service/imageauditservice.go +++ b/app/mix/service/imageauditservice.go @@ -2,7 +2,12 @@ package service import ( "service/api/consts" + "service/bizcommon/util" "service/dbstruct" + "service/library/logger" + + imageauditproto "service/api/proto/imageaudit/proto" + imageaudittaskproto "service/api/proto/imageaudittask/proto" "github.com/gin-gonic/gin" goproto "google.golang.org/protobuf/proto" @@ -25,6 +30,8 @@ func (s *Service) CreateUpdateAccountImageAudit(ctx *gin.Context, oldAccount *db OldMedia: oldAccount.Avatar, }) + addImageAuditTasks(ctx, tasks) + return } @@ -55,6 +62,8 @@ func (s *Service) CreateUpdateStreamerImageAudit(ctx *gin.Context, oldStreamer * }) } + addImageAuditTasks(ctx, tasks) + return } @@ -73,6 +82,8 @@ func (s *Service) CreateMomentImageAudit(ctx *gin.Context, newMoment *dbstruct.M }) } + addImageAuditTasks(ctx, tasks) + return } @@ -91,5 +102,115 @@ func (s *Service) CreateZoneMomentImageAudit(ctx *gin.Context, newZoneMoment *db }) } + addImageAuditTasks(ctx, tasks) + + return +} + +func addImageAuditTasks(ctx *gin.Context, tasks []*dbstruct.ImageAuditTask) error { + for _, task := range tasks { + err := addImageAuditTask(ctx, task) + if err != nil { + return err + } + } + return nil +} + +func addImageAuditTask(ctx *gin.Context, task *dbstruct.ImageAuditTask) error { + if task == nil || task.AuditedMedia == nil { + return nil + } + fragmentNum := len(task.AuditedMedia.GetImageIds()) + if fragmentNum == 0 { + return nil + } + + batchId, err := _DefaultContentAuditRTI.GetImageAuditBatchId(ctx) + if err != nil { + return err + } + + task.BatchId = goproto.String(batchId) + task.Status = goproto.Int64(consts.ImageAudit_Created) + + if fragmentNum == 1 { + task.IsFragmented = goproto.Int64(0) + task.FragmentsNum = goproto.Int64(1) + + // 写入图像审核表 + if err := prepareNotFragmentedImageAuditTask(ctx, task); err != nil { + return err + } + } else { + task.IsFragmented = goproto.Int64(1) + task.FragmentsNum = goproto.Int64(int64(fragmentNum)) + + // 写入图像审核表 + if err := prepareFragmentedImageAuditTask(ctx, task); err != nil { + return err + } + } + + // 写入图像审核任务表 + if err := _DefaultImageAuditTask.OpCreate(ctx, &imageaudittaskproto.OpCreateReq{ + ImageAuditTask: task, + }); err != nil { + logger.Error("Imageaudittask OpCreate failed: %v", err) + return err + } + + return nil +} + +// 根据任务信息,创建单个图片的图片审核,并存入数据库,返回images +func prepareFragmentedImageAuditTask(ctx *gin.Context, task *dbstruct.ImageAuditTask) (err error) { + + fragmentsNum := util.DerefInt64(task.FragmentsNum) + imageAudits := make([]*dbstruct.ImageAudit, fragmentsNum) + + // 将媒体拆分为单个图像并存入图片审核表 + imageIds := task.AuditedMedia.GetImageIds() + for i, imageId := range imageIds { + image := &dbstruct.MediaComponent{ + ImageIds: util.Int64Slice([]int64{imageId}), + } + imageAudits[i] = &dbstruct.ImageAudit{ + AuditedMedia: image, + BatchId: task.BatchId, + Status: goproto.Int64(consts.ImageAudit_Created), + } + } + if err = _DefaultImageAudit.OpCreateFragmentGroup(ctx, &imageauditproto.OpCreateBatchReq{ + ImageAudits: imageAudits, + FragmentsNum: fragmentsNum, + }); err != nil { + logger.Error("Imageaudit OpCreateFragmentGroup failed: %v", err) + return + } + + auditFragmentIds := make([]string, fragmentsNum) + for i, imageaudit := range imageAudits { + auditFragmentIds[i] = util.DerefString(imageaudit.Id) + } + task.ImageAuditFragmentIds = &auditFragmentIds + return +} + +func prepareNotFragmentedImageAuditTask(ctx *gin.Context, task *dbstruct.ImageAuditTask) (err error) { + + imageAudit := &dbstruct.ImageAudit{ + AuditedMedia: task.AuditedMedia, + BatchId: task.BatchId, + Status: goproto.Int64(consts.ImageAudit_Created), + } + if err = _DefaultImageAudit.OpCreate(ctx, &imageauditproto.OpCreateReq{ + ImageAudit: imageAudit, + }); err != nil { + logger.Error("Imageaudit OpCreate failed: %v", err) + return + } + + task.ImageAuditId = imageAudit.Id return } diff --git a/app/mix/service/logic/content_audit_rti.go b/app/mix/service/logic/content_audit_rti.go new file mode 100644 index 00000000..02b1e64f --- /dev/null +++ b/app/mix/service/logic/content_audit_rti.go @@ -0,0 +1,85 @@ +package logic + +import ( + "service/app/mix/dao" + "service/library/logger" + + "github.com/gin-gonic/gin" +) + +type ContentAuditRTI struct { + store *dao.Store +} + +func NewContentAuditRTI(store *dao.Store) (a *ContentAuditRTI) { + a = &ContentAuditRTI{ + store: store, + } + return +} + +// 图像审核任务批次号 +func (m *ContentAuditRTI) GetAndUpdateImageAuditBatchId(ctx *gin.Context, batchId string) (string, error) { + + imageAuditBatchId, err := m.store.GetAndUpdateImageAuditBatchId(ctx, batchId) + if err != nil { + logger.Error("GetAndUpdateImageAuditBatchId fail, err: %v", err) + return "", err + } + return imageAuditBatchId.BatchId, nil +} + +// 文字审核任务批次号 +func (m *ContentAuditRTI) GetAndUpdateTextAuditBatchId(ctx *gin.Context, batchId string) (string, error) { + + textAuditBatchId, err := m.store.GetAndUpdateTextAuditBatchId(ctx, batchId) + if err != nil { + logger.Error("GetAndUpdateTextAuditBatchId fail, err: %v", err) + return "", err + } + return textAuditBatchId.BatchId, nil +} + +// 视频审核任务批次号 +func (m *ContentAuditRTI) GetAndUpdateVideoModerationBatchId(ctx *gin.Context, batchId string) (string, error) { + + videoModerationBatchId, err := m.store.GetAndUpdateVideoModerationBatchId(ctx, batchId) + if err != nil { + logger.Error("GetAndUpdateVideoModerationBatchId fail, err: %v", err) + return "", err + } + return videoModerationBatchId.BatchId, nil +} + +// 图像审核任务批次号 +func (m *ContentAuditRTI) GetImageAuditBatchId(ctx *gin.Context) (string, error) { + + imageAuditBatchId, err := m.store.GetImageAuditBatchId(ctx) + if err != nil { + logger.Error("GetImageAuditBatchId fail, err: %v", err) + return "", err + } + return imageAuditBatchId.BatchId, nil +} + +// 文字审核任务批次号 +func (m *ContentAuditRTI) GetTextAuditBatchId(ctx *gin.Context) (string, error) { + + textAuditBatchId, err := m.store.GetTextAuditBatchId(ctx) + if err != nil { + logger.Error("GetTextAuditBatchId fail, err: %v", err) + return "", err + } + return textAuditBatchId.BatchId, nil +} + +// 视频审核任务批次号 +func (m *ContentAuditRTI) GetVideoModerationBatchId(ctx *gin.Context) (string, error) { + + videoModerationBatchId, err := m.store.GetVideoModerationBatchId(ctx) + if err != nil { + logger.Error("GetVideoModerationBatchId fail, err: %v", err) + return "", err + } + return videoModerationBatchId.BatchId, nil +} diff --git a/app/mix/service/service.go b/app/mix/service/service.go index e802127d..b0ca23e8 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -127,6 +127,7 @@ var ( _DefaultVideoModerationTask *logic.VideoModerationTask _DefaultStreamerAcct *logic.StreamerAcct _DefaultDailyStatementZoneInfo *logic.DailyStatementZoneInfo + _DefaultContentAuditRTI *logic.ContentAuditRTI ) type Service struct { @@ -216,6 +217,7 @@ func (s *Service) Init(c any) (err error) { _DefaultVideoModeration = logic.NewVideoModeration(store) _DefaultVideoModerationTask = logic.NewVideoModerationTask(store) _DefaultDailyStatementZoneInfo = logic.NewDailyStatementZoneInfo(store) + _DefaultContentAuditRTI = logic.NewContentAuditRTI(store) _DefaultVas = logic.NewVas(store, _DefaultStreamer, _DefaultAccount, _DefaultZone, _DefaultZoneThirdPartner, _DefaultZoneCollaborator) _DefaultStreamerAcct = logic.NewStreamerAcct(store) @@ -241,18 +243,21 @@ func (s *Service) Stop() { func (s *Service) ConnectToImageAudit() { imageaudit.ConnectToImageAuditService(_DefaultImageAudit) imageaudit.ConnectToImageAuditTaskService(_DefaultImageAuditTask, DefaultImageAuditTaskResultHandler) + imageaudit.ConnectToContentAuditRTIService(_DefaultContentAuditRTI) } // 文字审核数据库接口 func (s *Service) ConnectToTextAudit() { textaudit.ConnectToTextAuditService(_DefaultTextAudit) textaudit.ConnectToTextAuditTaskService(_DefaultTextAuditTask, DefaultTextAuditTaskResultHandler) + textaudit.ConnectToContentAuditRTIService(_DefaultContentAuditRTI) } // 视频审核数据库接口 func (s *Service) ConnectToVideoModeration() { videomoderation.ConnectToVideoModerationService(_DefaultVideoModeration) videomoderation.ConnectToVideoModerationTaskService(_DefaultVideoModerationTask, DefaultVideoModerationTaskResultHandler) + videomoderation.ConnectToContentAuditRTIService(_DefaultContentAuditRTI) } // Product @@ -801,7 +806,7 @@ func (s *Service) OpUpdateAccount(ctx *gin.Context, req *accountproto.OpUpdateRe } // 创建审核任务 - textaudittasks := s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account) + s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account) // 运营只可操作用户的昵称 err = _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{ @@ -827,8 +832,6 @@ func (s *Service) OpUpdateAccount(ctx *gin.Context, req *accountproto.OpUpdateRe } } - // 审核任务加入队列 - textaudit.AddTasks(textaudittasks) return } @@ -1083,8 +1086,6 @@ func (s *Service) OpCreateMoment(ctx *gin.Context, req *momentproto.OpCreateReq) // 添加审核任务 imageaudittasks := s.CreateMomentImageAudit(ctx, req.Moment) textaudittasks := s.CreateMomentTextAudit(ctx, oldMoment, req.Moment) - imageaudit.AddTasks(imageaudittasks) - textaudit.AddTasks(textaudittasks) // 封装动态审核任务 momentAuditTask := &dbstruct.MomentAuditTask{ @@ -1148,8 +1149,6 @@ func (s *Service) OpUpdateMoment(ctx *gin.Context, req *momentproto.OpUpdateReq) // 添加审核任务 imageaudittasks := s.CreateMomentImageAudit(ctx, req.Moment) textaudittasks := s.CreateMomentTextAudit(ctx, oldMoment, req.Moment) - imageaudit.AddTasks(imageaudittasks) - textaudit.AddTasks(textaudittasks) // 封装动态审核任务 momentAuditTask := &dbstruct.MomentAuditTask{ @@ -3361,12 +3360,9 @@ func (s *Service) OpCreateZoneMoment(ctx *gin.Context, req *zonemomentproto.OpCr } // 添加审核任务 - imageaudittasks := s.CreateZoneMomentImageAudit(ctx, req.ZoneMoment) - textaudittasks := s.CreateZoneMomentTextAudit(ctx, req.ZoneMoment) - videomoderationtasks := s.CreateZoneMomentVideoModeration(ctx, req.ZoneMoment) - imageaudit.AddTasks(imageaudittasks) - textaudit.AddTasks(textaudittasks) - videomoderation.AddTasks(videomoderationtasks) + s.CreateZoneMomentImageAudit(ctx, req.ZoneMoment) + s.CreateZoneMomentTextAudit(ctx, req.ZoneMoment) + s.CreateZoneMomentVideoModeration(ctx, req.ZoneMoment) // 设置价格 if req.ZoneMoment.GetCType() == consts.ZoneMomentCType_Paid { @@ -3460,12 +3456,9 @@ func (s *Service) OpUpdateZoneMoment(ctx *gin.Context, req *zonemomentproto.OpUp } // 添加审核任务 - imageaudittasks := s.CreateZoneMomentImageAudit(ctx, req.ZoneMoment) - textaudittasks := s.CreateZoneMomentTextAudit(ctx, req.ZoneMoment) - videomoderationtasks := s.CreateZoneMomentVideoModeration(ctx, req.ZoneMoment) - imageaudit.AddTasks(imageaudittasks) - textaudit.AddTasks(textaudittasks) - videomoderation.AddTasks(videomoderationtasks) + s.CreateZoneMomentImageAudit(ctx, req.ZoneMoment) + s.CreateZoneMomentTextAudit(ctx, req.ZoneMoment) + s.CreateZoneMomentVideoModeration(ctx, req.ZoneMoment) return } diff --git a/app/mix/service/textauditservice.go b/app/mix/service/textauditservice.go index 0142d10b..7bb9335b 100644 --- a/app/mix/service/textauditservice.go +++ b/app/mix/service/textauditservice.go @@ -3,6 +3,10 @@ package service import ( "service/api/consts" "service/dbstruct" + "service/library/logger" + + textauditproto "service/api/proto/textaudit/proto" + textaudittaskproto "service/api/proto/textaudittask/proto" "github.com/gin-gonic/gin" goproto "google.golang.org/protobuf/proto" @@ -25,6 +29,8 @@ func (s *Service) CreateUpdateAccountTextAudit(ctx *gin.Context, oldAccount *dbs OldText: oldAccount.Name, }) + addTextAuditTasks(ctx, tasks) + return } @@ -57,6 +63,8 @@ func (s *Service) CreateUpdateStreamerTextAudit(ctx *gin.Context, oldStreamer *d }) } + addTextAuditTasks(ctx, tasks) + return } @@ -75,6 +83,8 @@ func (s *Service) CreateMomentTextAudit(ctx *gin.Context, oldMoment *dbstruct.Mo }) } + addTextAuditTasks(ctx, tasks) + return } @@ -92,5 +102,55 @@ func (s *Service) CreateZoneMomentTextAudit(ctx *gin.Context, newZoneMoment *dbs }) } + addTextAuditTasks(ctx, tasks) + return } + +func addTextAuditTasks(ctx *gin.Context, tasks []*dbstruct.TextAuditTask) error { + for _, task := range tasks { + err := addTextAuditTask(ctx, task) + if err != nil { + return err + } + } + return nil +} + +func addTextAuditTask(ctx *gin.Context, task *dbstruct.TextAuditTask) error { + if task == nil || task.AuditedText == nil { + return nil + } + + batchId, err := _DefaultContentAuditRTI.GetTextAuditBatchId(ctx) + if err != nil { + return err + } + + task.BatchId = goproto.String(batchId) + task.Status = goproto.Int64(consts.TextAudit_Created) + + // 1.写入文字审核表 + textAudit := &dbstruct.TextAudit{ + AuditedText: task.AuditedText, + BatchId: task.BatchId, + Status: goproto.Int64(consts.TextAudit_Created), + } + if err := _DefaultTextAudit.OpCreate(ctx, &textauditproto.OpCreateReq{ + TextAudit: textAudit, + }); err != nil { + logger.Error("Textaudit OpCreate failed: %v", err) + return err + } + + task.TextAuditId = textAudit.Id + + // 2.写入文字审核任务表 + if err := _DefaultTextAuditTask.OpCreate(&gin.Context{}, &textaudittaskproto.OpCreateReq{ + TextAuditTask: task, + }); err != nil { + logger.Error("Textaudittask OpCreate failed: %v", err) + return err + } + return nil +} diff --git a/app/mix/service/video_moderation_service.go b/app/mix/service/video_moderation_service.go index e8db5a5b..c8d9becc 100644 --- a/app/mix/service/video_moderation_service.go +++ b/app/mix/service/video_moderation_service.go @@ -2,7 +2,12 @@ package service import ( "service/api/consts" + "service/bizcommon/util" "service/dbstruct" + "service/library/logger" + + video_moderation_proto "service/api/proto/video_moderation/proto" + video_moderation_task_proto "service/api/proto/video_moderation_task/proto" "github.com/gin-gonic/gin" goproto "google.golang.org/protobuf/proto" @@ -23,5 +28,115 @@ func (s *Service) CreateZoneMomentVideoModeration(ctx *gin.Context, newZoneMomen }) } + addVideoModerationTasks(ctx, tasks) + + return +} + +func addVideoModerationTasks(ctx *gin.Context, tasks []*dbstruct.VideoModerationTask) error { + for _, task := range tasks { + err := addVideoModerationTask(ctx, task) + if err != nil { + return err + } + } + return nil +} + +func addVideoModerationTask(ctx *gin.Context, task *dbstruct.VideoModerationTask) error { + if task == nil || task.AuditedMedia == nil { + return nil + } + fragmentNum := len(task.AuditedMedia.GetVideoIds()) + if fragmentNum == 0 { + return nil + } + + batchId, err := _DefaultContentAuditRTI.GetVideoModerationBatchId(ctx) + if err != nil { + return err + } + + task.BatchId = goproto.String(batchId) + task.Status = goproto.Int64(consts.VideoModeration_Created) + + if fragmentNum == 1 { + task.IsFragmented = goproto.Int64(0) + task.FragmentsNum = goproto.Int64(1) + + // 写入视频审核表 + if err := prepareNotFragmentedVideoModerationTask(ctx, task); err != nil { + return err + } + } else { + task.IsFragmented = goproto.Int64(1) + task.FragmentsNum = goproto.Int64(int64(fragmentNum)) + + // 写入视频审核表 + if err := prepareFragmentedVideoModerationTask(ctx, task); err != nil { + return err + } + } + + // 写入视频审核任务表 + if err := _DefaultVideoModerationTask.OpCreate(&gin.Context{}, &video_moderation_task_proto.OpCreateReq{ + VideoModerationTask: task, + }); err != nil { + logger.Error("Videomoderationtask OpCreate failed: %v", err) + return err + } + + return nil +} + +// 根据任务信息,创建单个视频的视频审核,并存入数据库 +func prepareFragmentedVideoModerationTask(ctx *gin.Context, task *dbstruct.VideoModerationTask) (err error) { + + fragmentsNum := task.GetFragmentsNum() + videoModerations := make([]*dbstruct.VideoModeration, fragmentsNum) + + // 将媒体拆分为单个视频并存入视频审核表 + imageIds := task.AuditedMedia.GetVideoIds() + for i, imageId := range imageIds { + image := &dbstruct.MediaComponent{ + VideoIds: util.Int64Slice([]int64{imageId}), + } + videoModerations[i] = &dbstruct.VideoModeration{ + AuditedMedia: image, + BatchId: task.BatchId, + Status: goproto.Int64(consts.VideoModeration_Created), + } + } + if err = _DefaultVideoModeration.OpCreateFragmentGroup(ctx, &video_moderation_proto.OpCreateBatchReq{ + VideoModerations: videoModerations, + FragmentsNum: fragmentsNum, + }); err != nil { + logger.Error("Videomoderation OpCreateFragmentGroup failed: %v", err) + return + } + + moderationFragmentIds := make([]string, fragmentsNum) + for i, videoModeration := range videoModerations { + moderationFragmentIds[i] = videoModeration.GetId() + } + task.VideoModerationFragmentIds = &moderationFragmentIds + return +} + +func prepareNotFragmentedVideoModerationTask(ctx *gin.Context, task *dbstruct.VideoModerationTask) (err error) { + + videoModeration := &dbstruct.VideoModeration{ + AuditedMedia: task.AuditedMedia, + BatchId: task.BatchId, + Status: goproto.Int64(consts.VideoModeration_Created), + } + if err = _DefaultVideoModeration.OpCreate(ctx, &video_moderation_proto.OpCreateReq{ + VideoModeration: videoModeration, + }); err != nil { + logger.Error("Videomoderation OpCreate failed: %v", err) + return + } + + task.VideoModerationId = videoModeration.Id return } diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index 024cfbfb..454e039e 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -192,7 +192,10 @@ func (s *CronService) ImageAuditBatch(ctx context.Context, param *xxl.RunReq) (m handleMsg := &strings.Builder{} // 刷新批次号 - batchId := imageaudit.RefreshBatchId() + batchId, err := imageaudit.RefreshBatchId() + if err != nil { + return fmt.Sprintf("Refresh batchId failed, err: %v", err) + } // 执行图像审核 successNum, failNum, err := imageaudit.Run(batchId) if err != nil { @@ -237,7 +240,11 @@ func (s *CronService) TextAuditBatch(ctx context.Context, param *xxl.RunReq) (ms handleMsg := &strings.Builder{} // 刷新批次号 - batchId := textaudit.RefreshBatchId() + batchId, err := textaudit.RefreshBatchId() + if err != nil { + return fmt.Sprintf("Refresh batchId failed, err: %v", err) + } + // 执 // 执行文字审核 successNum, failNum, err := textaudit.Run(batchId) if err != nil { @@ -508,14 +515,17 @@ func (s *CronService) ClearWrongPswdTimes(ctx context.Context, param *xxl.RunReq return "wrong_pswd_times has been cleared" } -// 图像审核作业 +// 视频审核作业 func (s *CronService) VideoModerationBatch(ctx context.Context, param *xxl.RunReq) (msg string) { logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) handleMsg := &strings.Builder{} // 刷新批次号 - batchId := videomoderation.RefreshBatchId() - // 执行图像审核 + batchId, err := videomoderation.RefreshBatchId() + if err != nil { + return fmt.Sprintf("Refresh batchId failed, err: %v", err) + } + // 执行视频审核 successNum, failNum, err := videomoderation.Run(batchId) if err != nil { handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, video moderation tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err)) diff --git a/dbstruct/idSeq.go b/dbstruct/idSeq.go index 4cbb13c0..6be66020 100644 --- a/dbstruct/idSeq.go +++ b/dbstruct/idSeq.go @@ -68,3 +68,15 @@ type ZoneMomentIdSeq struct { type ZoneCollaboratorIdSeq struct { Seq int64 //用户Id序列号 } + +type ImageAuditBatchId struct { + BatchId string `json:"batch_id" bson:"batch_id"` //图像审核批次号 +} + +type TextAuditBatchId struct { + BatchId string `json:"batch_id" bson:"batch_id"` //文字审核批次号 +} + +type VideoModerationBatchId struct { + BatchId string `json:"batch_id" bson:"batch_id"` //视频审核批次号 +} diff --git a/library/contentaudit/imageaudit/client.go b/library/contentaudit/imageaudit/client.go index 7746c515..12763802 100644 --- a/library/contentaudit/imageaudit/client.go +++ b/library/contentaudit/imageaudit/client.go @@ -38,6 +38,10 @@ type ImageAuditTaskService interface { OpHandleOverdue(ctx *gin.Context, task *dbstruct.ImageAuditTask, batchId string) error } +type ContentAuditRTIService interface { + GetAndUpdateImageAuditBatchId(ctx *gin.Context, batchId string) (string, error) +} + type ImageAuditTaskResultHandler interface { Handle(ctx *gin.Context, task *dbstruct.ImageAuditTask, option int) error } @@ -46,6 +50,7 @@ var defaultImageAuditClient *imageaudit.Client var _DefaultImageAudit ImageAuditService var _DefaultImageAuditTask ImageAuditTaskService var _DefaultResultHandler ImageAuditTaskResultHandler +var _DefaultContentAuditRTI ContentAuditRTIService var scenes []*string func Init(cfg *configcenter.ImageAuditConfig) (err error) { @@ -71,19 +76,16 @@ func Init(cfg *configcenter.ImageAuditConfig) (err error) { scenes[i] = tea.String(scene) } - // 初始化调度器 - initScheduler(cfg) - - // // 启动调度任务 - // defaultImageAuditTaskScheduler.Run() + // batch_id + _, err = RefreshBatchId() + if err != nil { + logger.Error("RefreshBatchId fail, cfg: %v, err: %v", util.ToJson(config), err) + return + } return } -// func GiveNoticeToBatch() { -// defaultImageAuditTaskScheduler.GiveNoticeToBatch() -// } - func ConnectToImageAuditService(serivce ImageAuditService) { _DefaultImageAudit = serivce } @@ -92,3 +94,7 @@ func ConnectToImageAuditTaskService(service ImageAuditTaskService, handler Image _DefaultImageAuditTask = service _DefaultResultHandler = handler } + +func ConnectToContentAuditRTIService(service ContentAuditRTIService) { + _DefaultContentAuditRTI = service +} diff --git a/library/contentaudit/imageaudit/imageaudit.go b/library/contentaudit/imageaudit/imageaudit.go index 1c9aaf7a..b247281c 100644 --- a/library/contentaudit/imageaudit/imageaudit.go +++ b/library/contentaudit/imageaudit/imageaudit.go @@ -19,10 +19,13 @@ import ( ) // 刷新批次号 -func RefreshBatchId() string { - batchId := defaultImageAuditTaskScheduler.batchId - defaultImageAuditTaskScheduler.batchId = genereteBatchId() - return batchId +func RefreshBatchId() (string, error) { + batchId, err := _DefaultContentAuditRTI.GetAndUpdateImageAuditBatchId(&gin.Context{}, genereteBatchId()) + if err != nil { + logger.Info("_DefaultContentAuditRTI GetAndUpdateImageAuditBatchId fail: %v", err) + return "", err + } + return batchId, nil } // 图像审核主逻辑 diff --git a/library/contentaudit/imageaudit/task.go b/library/contentaudit/imageaudit/task.go deleted file mode 100644 index 81e2eb42..00000000 --- a/library/contentaudit/imageaudit/task.go +++ /dev/null @@ -1,118 +0,0 @@ -package imageaudit - -import ( - "service/api/consts" - imageauditproto "service/api/proto/imageaudit/proto" - imageaudittaskproto "service/api/proto/imageaudittask/proto" - "service/bizcommon/util" - "service/dbstruct" - "service/library/logger" - - "github.com/gin-gonic/gin" - goproto "google.golang.org/protobuf/proto" -) - -func AddTasks(tasks []*dbstruct.ImageAuditTask) error { - for _, task := range tasks { - err := AddTask(task) - if err != nil { - return err - } - } - return nil -} - -func AddTask(task *dbstruct.ImageAuditTask) error { - if task == nil || task.AuditedMedia == nil { - return nil - } - fragmentNum := len(task.AuditedMedia.GetImageIds()) - if fragmentNum == 0 { - return nil - } - - task.BatchId = goproto.String(defaultImageAuditTaskScheduler.batchId) - task.Status = goproto.Int64(consts.ImageAudit_Created) - - if fragmentNum == 1 { - task.IsFragmented = goproto.Int64(0) - task.FragmentsNum = goproto.Int64(1) - - // 写入图像审核表 - if err := prepareNotFragmentedImageAuditTask(task); err != nil { - return err - } - } else { - task.IsFragmented = goproto.Int64(1) - task.FragmentsNum = goproto.Int64(int64(fragmentNum)) - - // 写入图像审核表 - if err := prepareFragmentedImageAuditTask(task); err != nil { - return err - } - } - - // 写入图像审核任务表 - if err := _DefaultImageAuditTask.OpCreate(&gin.Context{}, &imageaudittaskproto.OpCreateReq{ - ImageAuditTask: task, - }); err != nil { - logger.Error("Imageaudittask OpCreate failed: %v", err) - return err - } - - return nil -} - -// 根据任务信息,创建单个图片的图片审核,并存入数据库,返回images -func prepareFragmentedImageAuditTask(task *dbstruct.ImageAuditTask) (err error) { - ctx := &gin.Context{} - - fragmentsNum := util.DerefInt64(task.FragmentsNum) - imageAudits := make([]*dbstruct.ImageAudit, fragmentsNum) - - // 将媒体拆分为单个图像并存入图片审核表 - imageIds := task.AuditedMedia.GetImageIds() - for i, imageId := range imageIds { - image := &dbstruct.MediaComponent{ - ImageIds: util.Int64Slice([]int64{imageId}), - } - imageAudits[i] = &dbstruct.ImageAudit{ - AuditedMedia: image, - BatchId: task.BatchId, - Status: goproto.Int64(consts.ImageAudit_Created), - } - } - if err = _DefaultImageAudit.OpCreateFragmentGroup(ctx, &imageauditproto.OpCreateBatchReq{ - ImageAudits: imageAudits, - FragmentsNum: fragmentsNum, - }); err != nil { - logger.Error("Imageaudit OpCreateFragmentGroup failed: %v", err) - return - } - - auditFragmentIds := make([]string, fragmentsNum) - for i, imageaudit := range imageAudits { - auditFragmentIds[i] = util.DerefString(imageaudit.Id) - } - task.ImageAuditFragmentIds = &auditFragmentIds - return -} - -func prepareNotFragmentedImageAuditTask(task *dbstruct.ImageAuditTask) (err error) { - ctx := &gin.Context{} - - imageAudit := &dbstruct.ImageAudit{ - AuditedMedia: task.AuditedMedia, - BatchId: task.BatchId, - Status: goproto.Int64(consts.ImageAudit_Created), - } - if err = _DefaultImageAudit.OpCreate(ctx, &imageauditproto.OpCreateReq{ - ImageAudit: imageAudit, - }); err != nil { - logger.Error("Imageaudit OpCreate failed: %v", err) - return - } - - task.ImageAuditId = imageAudit.Id - return -} diff --git a/library/contentaudit/imageaudit/taskscheduler.go b/library/contentaudit/imageaudit/taskscheduler.go index 7ee484ee..74d4b710 100644 --- a/library/contentaudit/imageaudit/taskscheduler.go +++ b/library/contentaudit/imageaudit/taskscheduler.go @@ -2,24 +2,9 @@ package imageaudit import ( "fmt" - "service/library/configcenter" "time" ) -var defaultImageAuditTaskScheduler *ImageAuditTaskScheduler - -// 图像审核任务调度器 -type ImageAuditTaskScheduler struct { - // 状态记录 - batchId string // 当前批次号 -} - -func initScheduler(cfg *configcenter.ImageAuditConfig) { - defaultImageAuditTaskScheduler = &ImageAuditTaskScheduler{ - batchId: genereteBatchId(), - } -} - // 生成批次号 func genereteBatchId() string { now := time.Now() diff --git a/library/contentaudit/textaudit/client.go b/library/contentaudit/textaudit/client.go index 8fc1a3b0..424cd3a0 100644 --- a/library/contentaudit/textaudit/client.go +++ b/library/contentaudit/textaudit/client.go @@ -4,6 +4,7 @@ import ( "fmt" "service/dbstruct" "service/library/configcenter" + "service/library/logger" "strings" textauditproto "service/api/proto/textaudit/proto" @@ -33,6 +34,10 @@ type TextAuditTaskService interface { OpHandleOverdue(ctx *gin.Context, task *dbstruct.TextAuditTask, batchId string) error } +type ContentAuditRTIService interface { + GetAndUpdateTextAuditBatchId(ctx *gin.Context, batchId string) (string, error) +} + type TextAuditTaskResultHandler interface { Handle(ctx *gin.Context, task *dbstruct.TextAuditTask, option int) error } @@ -41,6 +46,7 @@ var defaultTextAuditClient *textaudit.Client var _DefaultTextAudit TextAuditService var _DefaultTextAuditTask TextAuditTaskService var _DefaultResultHandler TextAuditTaskResultHandler +var _DefaultContentAuditRTI ContentAuditRTIService var labels []string func ConnectToTextAuditClient(client *textaudit.Client) { @@ -56,8 +62,11 @@ func Init(cfg *configcenter.TextAuditConfig) (err error) { // 初始化审核选项 labels = strings.Split(cfg.Labels, " ") - // 初始化调度器 - initScheduler(cfg) + _, err = RefreshBatchId() + if err != nil { + logger.Error("RefreshBatchId fail, err: %v", err) + return + } return } @@ -70,3 +79,7 @@ func ConnectToTextAuditTaskService(service TextAuditTaskService, handler TextAud _DefaultTextAuditTask = service _DefaultResultHandler = handler } + +func ConnectToContentAuditRTIService(service ContentAuditRTIService) { + _DefaultContentAuditRTI = service +} diff --git a/library/contentaudit/textaudit/control_block.go b/library/contentaudit/textaudit/control_block.go index 187acf2d..d7162273 100644 --- a/library/contentaudit/textaudit/control_block.go +++ b/library/contentaudit/textaudit/control_block.go @@ -2,13 +2,11 @@ package textaudit import ( "fmt" - "service/api/consts" "service/bizcommon/util" "service/dbstruct" "service/library/logger" "github.com/gin-gonic/gin" - goproto "google.golang.org/protobuf/proto" ) // 批次图像审核任务控制块 @@ -65,13 +63,6 @@ type TextAuditTaskControlBlock struct { // 新建文字审核任务块 func NewTextAuditTaskControlBlock(task *dbstruct.TextAuditTask) (tcb *TextAuditTaskControlBlock) { - if task == nil || task.AuditedText == nil { - return - } - - task.BatchId = goproto.String(defaultTextAuditTaskScheduler.batchId) - task.Status = goproto.Int64(consts.TextAudit_Created) - tcb = &TextAuditTaskControlBlock{ ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName), util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)), diff --git a/library/contentaudit/textaudit/task.go b/library/contentaudit/textaudit/task.go deleted file mode 100644 index 4e4168d0..00000000 --- a/library/contentaudit/textaudit/task.go +++ /dev/null @@ -1,57 +0,0 @@ -package textaudit - -import ( - "service/api/consts" - textauditproto "service/api/proto/textaudit/proto" - textaudittaskproto "service/api/proto/textaudittask/proto" - "service/dbstruct" - "service/library/logger" - - "github.com/gin-gonic/gin" - goproto "google.golang.org/protobuf/proto" -) - -func AddTasks(tasks []*dbstruct.TextAuditTask) error { - for _, task := range tasks { - err := AddTask(task) - if err != nil { - return err - } - } - return nil -} - -func AddTask(task *dbstruct.TextAuditTask) error { - if task == nil || task.AuditedText == nil { - return nil - } - - task.BatchId = goproto.String(defaultTextAuditTaskScheduler.batchId) - task.Status = goproto.Int64(consts.TextAudit_Created) - - // 1.写入文字审核表 - ctx := &gin.Context{} - - textAudit := &dbstruct.TextAudit{ - AuditedText: task.AuditedText, - BatchId: task.BatchId, - Status: goproto.Int64(consts.TextAudit_Created), - } - if err := _DefaultTextAudit.OpCreate(ctx, &textauditproto.OpCreateReq{ - TextAudit: textAudit, - }); err != nil { - logger.Error("Textaudit OpCreate failed: %v", err) - return err - } - - task.TextAuditId = textAudit.Id - - // 2.写入文字审核任务表 - if err := _DefaultTextAuditTask.OpCreate(&gin.Context{}, &textaudittaskproto.OpCreateReq{ - TextAuditTask: task, - }); err != nil { - logger.Error("Textaudittask OpCreate failed: %v", err) - return err - } - return nil -} diff --git a/library/contentaudit/textaudit/taskscheduler.go b/library/contentaudit/textaudit/taskscheduler.go index eb7c92e1..d0d10312 100644 --- a/library/contentaudit/textaudit/taskscheduler.go +++ b/library/contentaudit/textaudit/taskscheduler.go @@ -2,24 +2,9 @@ package textaudit import ( "fmt" - "service/library/configcenter" "time" ) -var defaultTextAuditTaskScheduler *TextAuditTaskScheduler - -// 文字审核任务调度器 -type TextAuditTaskScheduler struct { - // 状态记录 - batchId string // 当前批次号 -} - -func initScheduler(cfg *configcenter.TextAuditConfig) { - defaultTextAuditTaskScheduler = &TextAuditTaskScheduler{ - batchId: genereteBatchId(), - } -} - // 生成批次号 func genereteBatchId() string { now := time.Now() diff --git a/library/contentaudit/textaudit/textaudit.go b/library/contentaudit/textaudit/textaudit.go index 0af8ec6c..a6966516 100644 --- a/library/contentaudit/textaudit/textaudit.go +++ b/library/contentaudit/textaudit/textaudit.go @@ -17,10 +17,13 @@ import ( ) // 刷新批次号 -func RefreshBatchId() string { - batchId := defaultTextAuditTaskScheduler.batchId - defaultTextAuditTaskScheduler.batchId = genereteBatchId() - return batchId +func RefreshBatchId() (string, error) { + batchId, err := _DefaultContentAuditRTI.GetAndUpdateTextAuditBatchId(&gin.Context{}, genereteBatchId()) + if err != nil { + logger.Info("_DefaultContentAuditRTI GetAndUpdateTextAuditBatchId fail: %v", err) + return "", err + } + return batchId, nil } func Run(batchId string) (successNum int, failNum int, err error) { diff --git a/library/contentaudit/video_moderation/client.go b/library/contentaudit/video_moderation/client.go index ea1aaf37..d5c186f6 100644 --- a/library/contentaudit/video_moderation/client.go +++ b/library/contentaudit/video_moderation/client.go @@ -36,6 +36,10 @@ type VideoModerationTaskService interface { OpHandleOverdue(ctx *gin.Context, task *dbstruct.VideoModerationTask, batchId string) error } +type ContentAuditRTIService interface { + GetAndUpdateVideoModerationBatchId(ctx *gin.Context, batchId string) (string, error) +} + type VideoModerationTaskResultHandler interface { Handle(ctx *gin.Context, task *dbstruct.VideoModerationTask, option int) error } @@ -44,6 +48,7 @@ var defaultVideoModerationClient *green20220302.Client var _DefaultVideoModeration VideoModerationService var _DefaultVideoModerationTask VideoModerationTaskService var _DefaultResultHandler VideoModerationTaskResultHandler +var _DefaultContentAuditRTI ContentAuditRTIService var _defaultConfig *configcenter.VideoModerationConfig func Init(cfg *configcenter.VideoModerationConfig) (err error) { @@ -67,6 +72,12 @@ func Init(cfg *configcenter.VideoModerationConfig) (err error) { return } + _, err = RefreshBatchId() + if err != nil { + logger.Error("RefreshBatchId fail, cfg: %v, err: %v", util.ToJson(config), err) + return + } + // 初始化调度器 initScheduler(cfg) @@ -83,3 +94,7 @@ func ConnectToVideoModerationTaskService(service VideoModerationTaskService, han _DefaultVideoModerationTask = service _DefaultResultHandler = handler } + +func ConnectToContentAuditRTIService(service ContentAuditRTIService) { + _DefaultContentAuditRTI = service +} diff --git a/library/contentaudit/video_moderation/task.go b/library/contentaudit/video_moderation/task.go deleted file mode 100644 index fefd71bb..00000000 --- a/library/contentaudit/video_moderation/task.go +++ /dev/null @@ -1,118 +0,0 @@ -package videomoderation - -import ( - "service/api/consts" - video_moderation_proto "service/api/proto/video_moderation/proto" - video_moderation_task_proto "service/api/proto/video_moderation_task/proto" - "service/bizcommon/util" - "service/dbstruct" - "service/library/logger" - - "github.com/gin-gonic/gin" - goproto "google.golang.org/protobuf/proto" -) - -func AddTasks(tasks []*dbstruct.VideoModerationTask) error { - for _, task := range tasks { - err := AddTask(task) - if err != nil { - return err - } - } - return nil -} - -func AddTask(task *dbstruct.VideoModerationTask) error { - if task == nil || task.AuditedMedia == nil { - return nil - } - fragmentNum := len(task.AuditedMedia.GetVideoIds()) - if fragmentNum == 0 { - return nil - } - - task.BatchId = goproto.String(defaultVideoModerationTaskScheduler.batchId) - task.Status = goproto.Int64(consts.VideoModeration_Created) - - if fragmentNum == 1 { - task.IsFragmented = goproto.Int64(0) - task.FragmentsNum = goproto.Int64(1) - - // 写入视频审核表 - if err := prepareNotFragmentedVideoModerationTask(task); err != nil { - return err - } - } else { - task.IsFragmented = goproto.Int64(1) - task.FragmentsNum = goproto.Int64(int64(fragmentNum)) - - // 写入视频审核表 - if err := prepareFragmentedVideoModerationTask(task); err != nil { - return err - } - } - - // 写入视频审核任务表 - if err := _DefaultVideoModerationTask.OpCreate(&gin.Context{}, &video_moderation_task_proto.OpCreateReq{ - VideoModerationTask: task, - }); err != nil { - logger.Error("Videomoderationtask OpCreate failed: %v", err) - return err - } - - return nil -} - -// 根据任务信息,创建单个视频的视频审核,并存入数据库 -func prepareFragmentedVideoModerationTask(task *dbstruct.VideoModerationTask) (err error) { - ctx := &gin.Context{} - - fragmentsNum := task.GetFragmentsNum() - videoModerations := make([]*dbstruct.VideoModeration, fragmentsNum) - - // 将媒体拆分为单个视频并存入视频审核表 - imageIds := task.AuditedMedia.GetVideoIds() - for i, imageId := range imageIds { - image := &dbstruct.MediaComponent{ - VideoIds: util.Int64Slice([]int64{imageId}), - } - videoModerations[i] = &dbstruct.VideoModeration{ - AuditedMedia: image, - BatchId: task.BatchId, - Status: goproto.Int64(consts.VideoModeration_Created), - } - } - if err = _DefaultVideoModeration.OpCreateFragmentGroup(ctx, &video_moderation_proto.OpCreateBatchReq{ - VideoModerations: videoModerations, - FragmentsNum: fragmentsNum, - }); err != nil { - logger.Error("Videomoderation OpCreateFragmentGroup failed: %v", err) - return - } - - moderationFragmentIds := make([]string, fragmentsNum) - for i, videoModeration := range videoModerations { - moderationFragmentIds[i] = videoModeration.GetId() - } - task.VideoModerationFragmentIds = &moderationFragmentIds - return -} - -func prepareNotFragmentedVideoModerationTask(task *dbstruct.VideoModerationTask) (err error) { - ctx := &gin.Context{} - - videoModeration := &dbstruct.VideoModeration{ - AuditedMedia: task.AuditedMedia, - BatchId: task.BatchId, - Status: goproto.Int64(consts.VideoModeration_Created), - } - if err = _DefaultVideoModeration.OpCreate(ctx, &video_moderation_proto.OpCreateReq{ - VideoModeration: videoModeration, - }); err != nil { - logger.Error("Videomoderation OpCreate failed: %v", err) - return - } - - task.VideoModerationId = videoModeration.Id - return -} diff --git a/library/contentaudit/video_moderation/taskscheduler.go b/library/contentaudit/video_moderation/taskscheduler.go index a96eba00..a1ba9643 100644 --- a/library/contentaudit/video_moderation/taskscheduler.go +++ b/library/contentaudit/video_moderation/taskscheduler.go @@ -11,17 +11,13 @@ var defaultVideoModerationTaskScheduler *VideoModerationTaskScheduler // 图像审核任务调度器 type VideoModerationTaskScheduler struct { - // 状态记录 - batchId string // 当前批次号 - // 批次审核任务控制块 btcbMp map[string]*VideoModerationTaskBatchControlBlock } func initScheduler(cfg *configcenter.VideoModerationConfig) { defaultVideoModerationTaskScheduler = &VideoModerationTaskScheduler{ - batchId: genereteBatchId(), - btcbMp: make(map[string]*VideoModerationTaskBatchControlBlock), + btcbMp: make(map[string]*VideoModerationTaskBatchControlBlock), } } diff --git a/library/contentaudit/video_moderation/video_moderation.go b/library/contentaudit/video_moderation/video_moderation.go index aa5e0ffd..cce9bc3b 100644 --- a/library/contentaudit/video_moderation/video_moderation.go +++ b/library/contentaudit/video_moderation/video_moderation.go @@ -21,10 +21,13 @@ import ( ) // 刷新批次号 -func RefreshBatchId() string { - batchId := defaultVideoModerationTaskScheduler.batchId - defaultVideoModerationTaskScheduler.batchId = genereteBatchId() - return batchId +func RefreshBatchId() (string, error) { + batchId, err := _DefaultContentAuditRTI.GetAndUpdateVideoModerationBatchId(&gin.Context{}, genereteBatchId()) + if err != nil { + logger.Info("_DefaultContentAuditRTI GetAndUpdateVideoModerationBatchId fail: %v", err) + return "", err + } + return batchId, nil } // 视频审核主逻辑 @@ -106,7 +109,7 @@ func createVideoModerationTaskBatchControlBlock(tasks []*dbstruct.VideoModeratio } videos = append(videos, video) mediaFillables = append(mediaFillables, video) - dataIds = append(dataIds, defaultVideoModerationTaskScheduler.batchId+videoAuditIds[i]) + dataIds = append(dataIds, task.GetBatchId()+videoAuditIds[i]) batchTaskCtrlBlock.VidmodId2taskMap[videoAuditIds[i]] = tcb }