by Robin at 20240120; fix

This commit is contained in:
Leufolium 2024-01-20 18:48:38 +08:00
parent a52ac4dfa7
commit 9430918b7b
16 changed files with 352 additions and 308 deletions

View File

@ -4,3 +4,8 @@ const (
ImageAuditTaskUpdate_Pass = 0
ImageAuditTaskUpdate_Rollback = 1
)
const (
TextAuditTaskUpdate_Pass = 0
TextAuditTaskUpdate_Rollback = 1
)

View File

@ -71,6 +71,7 @@ func main() {
service.DefaultCronService = service.NewCronService()
service.DefaultScriptsService = service.NewScriptsService()
service.DefaultImageAuditTaskResultHandler = service.NewImageAuditTaskResultHandler()
service.DefaultTextAuditTaskResultHandler = service.NewTextAuditTaskResultHandler()
err = service.DefaultService.Init(cfg)
if err != nil {
msg := fmt.Sprintf("Service init fail, err: %v", err)

View File

@ -332,7 +332,7 @@ func (s *Service) ApiUpdateAccount(ctx *gin.Context, req *accountproto.ApiUpdate
// 创建审核任务
imageaudittasks := s.CreateUpdateAccountImageAudit(ctx, oldAccount, req.Account)
textaudittask := s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account)
textaudittasks := s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account)
// 用户只允许修改昵称和头像
if err := _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{
@ -349,9 +349,7 @@ func (s *Service) ApiUpdateAccount(ctx *gin.Context, req *accountproto.ApiUpdate
// 审核任务加入队列
imageaudit.AddTasks(imageaudittasks)
if textaudittask != nil {
textaudit.AddTask(textaudittask)
}
textaudit.AddTasks(textaudittasks)
return
}
@ -942,16 +940,8 @@ func (s *Service) ApiUpdateStreamer(ctx *gin.Context, req *streamerproto.ApiUpda
}
// 审核任务加入队列
for i := range imageaudittasks {
if imageaudittasks[i] != nil {
imageaudit.AddTask(imageaudittasks[i])
}
}
for i := range textaudittasks {
if textaudittasks[i] != nil {
textaudit.AddTask(textaudittasks[i])
}
}
imageaudit.AddTasks(imageaudittasks)
textaudit.AddTasks(textaudittasks)
return
}

View File

@ -49,13 +49,12 @@ func (s *CronService) Init(c any) (err error) {
exec.RegTask("create_daily_statement", s.CreateDailyStatement)
exec.RegTask("image_audit_batch", s.ImageAuditBatch)
exec.RegTask("image_audit_batch_his", s.ImageAuditBatchHis)
exec.RegTask("text_audit_batch", s.TextAuditBatch)
exec.RegTask("text_audit_batch_his", s.TextAuditBatchHis)
exec.LogHandler(customLogHandle)
//注册任务handler
if err = exec.Run(); err != nil {
logger.Error("xxl-job executor init fail: %v", err)
return
}
go exec.Run()
return
}

View File

@ -183,7 +183,7 @@ func (s *Service) ConnectToImageAudit() {
imageaudit.ConnectToImageAuditService(_DefaultImageAudit)
imageaudit.ConnectToImageAuditTaskService(_DefaultImageAuditTask, DefaultImageAuditTaskResultHandler)
textaudit.ConnectToTextAuditService(_DefaultTextAudit)
textaudit.ConnectToTextAuditTaskService(_DefaultTextAuditTask)
textaudit.ConnectToTextAuditTaskService(_DefaultTextAuditTask, DefaultTextAuditTaskResultHandler)
}
// Product
@ -732,7 +732,7 @@ func (s *Service) OpUpdateAccount(ctx *gin.Context, req *accountproto.OpUpdateRe
}
// 创建审核任务
textaudittask := s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account)
textaudittasks := s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account)
// 运营只可操作用户的昵称
err = _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{
@ -748,9 +748,7 @@ func (s *Service) OpUpdateAccount(ctx *gin.Context, req *accountproto.OpUpdateRe
}
// 审核任务加入队列
if textaudittask != nil {
textaudit.AddTask(textaudittask)
}
textaudit.AddTasks(textaudittasks)
return
}
@ -2435,7 +2433,7 @@ func (s *Service) OpPassTextAuditTaskBatch(ctx *gin.Context, req *textaudittaskp
ids := make([]string, 0)
for i, task := range req.TextAuditTasks {
if err := s.GetUpdateTextFunc(ctx, task)(); err != nil {
if err := DefaultTextAuditTaskResultHandler.Handle(ctx, task, consts.TextAuditTaskUpdate_Pass); err != nil {
logger.Error("Execution of update function of %dth task fail: %v", i, err)
ec = errcode.ErrCodeTextAuditTaskManuallyPassFail
break

View File

@ -1,32 +1,20 @@
package service
import (
accountproto "service/api/proto/account/proto"
streamerproto "service/api/proto/streamer/proto"
"service/app/mix/dao"
"service/bizcommon/util"
"service/dbstruct"
"service/library/contentaudit/textaudit"
"github.com/gin-gonic/gin"
goproto "google.golang.org/protobuf/proto"
)
func (s *Service) CreateUpdateAccountTextAudit(ctx *gin.Context, oldAccount *dbstruct.Account, newAccount *dbstruct.Account) (task *textaudit.TextAuditTaskControlBlock) {
func (s *Service) CreateUpdateAccountTextAudit(ctx *gin.Context, oldAccount *dbstruct.Account, newAccount *dbstruct.Account) (tasks []*dbstruct.TextAuditTask) {
tasks = make([]*dbstruct.TextAuditTask, 0)
if newAccount.Name == nil {
return
}
rollBackFunc := func() error {
return _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{
Account: &dbstruct.Account{
Mid: oldAccount.Mid,
Name: oldAccount.Name,
},
})
}
task = textaudit.NewTextAuditTaskControlBlock(&dbstruct.TextAuditTask{
tasks = append(tasks, &dbstruct.TextAuditTask{
RouteUrl: goproto.String(ctx.Request.URL.Path),
AssociativeDatabase: goproto.String("account"),
AssociativeTableName: goproto.String("account"),
@ -34,25 +22,17 @@ func (s *Service) CreateUpdateAccountTextAudit(ctx *gin.Context, oldAccount *dbs
AssociativeTableColumn: goproto.String("name"),
AuditedText: newAccount.Name,
OldText: oldAccount.Name,
}, rollBackFunc)
})
return
}
func (s *Service) CreateUpdateStreamerTextAudit(ctx *gin.Context, oldStreamer *dbstruct.Streamer, newStreamer *dbstruct.Streamer) (tasks []*textaudit.TextAuditTaskControlBlock) {
tasks = make([]*textaudit.TextAuditTaskControlBlock, 0)
func (s *Service) CreateUpdateStreamerTextAudit(ctx *gin.Context, oldStreamer *dbstruct.Streamer, newStreamer *dbstruct.Streamer) (tasks []*dbstruct.TextAuditTask) {
tasks = make([]*dbstruct.TextAuditTask, 0)
if newStreamer.Bio != nil {
rollBackFunc := func() error {
return _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{
Streamer: &dbstruct.Streamer{
Mid: oldStreamer.Mid,
Bio: oldStreamer.Bio,
},
})
}
tasks = append(tasks, textaudit.NewTextAuditTaskControlBlock(&dbstruct.TextAuditTask{
tasks = append(tasks, &dbstruct.TextAuditTask{
RouteUrl: goproto.String(ctx.Request.URL.Path),
AssociativeDatabase: goproto.String("streamer"),
AssociativeTableName: goproto.String("streamer"),
@ -60,20 +40,12 @@ func (s *Service) CreateUpdateStreamerTextAudit(ctx *gin.Context, oldStreamer *d
AssociativeTableColumn: goproto.String("bio"),
AuditedText: newStreamer.Bio,
OldText: oldStreamer.Bio,
}, rollBackFunc))
})
}
if newStreamer.AutoResponseMessage != nil {
rollBackFunc := func() error {
return _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{
Streamer: &dbstruct.Streamer{
Mid: oldStreamer.Mid,
AutoResponseMessage: oldStreamer.AutoResponseMessage,
},
})
}
tasks = append(tasks, textaudit.NewTextAuditTaskControlBlock(&dbstruct.TextAuditTask{
tasks = append(tasks, &dbstruct.TextAuditTask{
RouteUrl: goproto.String(ctx.Request.URL.Path),
AssociativeDatabase: goproto.String("streamer"),
AssociativeTableName: goproto.String("streamer"),
@ -81,60 +53,8 @@ func (s *Service) CreateUpdateStreamerTextAudit(ctx *gin.Context, oldStreamer *d
AssociativeTableColumn: goproto.String("auto_response_message"),
AuditedText: newStreamer.AutoResponseMessage,
OldText: oldStreamer.AutoResponseMessage,
}, rollBackFunc))
})
}
return
}
func (s *Service) GetUpdateTextFunc(ctx *gin.Context, task *dbstruct.TextAuditTask) func() error {
db := util.DerefString(task.AssociativeDatabase)
table := util.DerefString(task.AssociativeTableName)
column := util.DerefString(task.AssociativeTableColumn)
if db == dao.DBAccount && table == dao.DBAccount {
return s.GetAccountUpdateTextFunc(ctx, column, task.AuditedText, task.AssociativeTableId)
}
if db == dao.DBStreamer && table == dao.DBStreamer {
return s.GetStreamerUpdateTextFunc(ctx, column, task.AuditedText, task.AssociativeTableId)
}
return func() error { return nil }
}
func (s *Service) GetAccountUpdateTextFunc(ctx *gin.Context, column string, text *string, id *int64) func() error {
if column == "name" {
return func() error {
return _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{
Account: &dbstruct.Account{
Mid: id,
Name: text,
},
})
}
}
return func() error { return nil }
}
func (s *Service) GetStreamerUpdateTextFunc(ctx *gin.Context, column string, text *string, id *int64) func() error {
switch column {
case "bio":
return func() error {
return _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{
Streamer: &dbstruct.Streamer{
Mid: id,
Bio: text,
},
})
}
case "auto_response_message":
return func() error {
return _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{
Streamer: &dbstruct.Streamer{
Mid: id,
AutoResponseMessage: text,
},
})
}
}
return func() error { return nil }
}

View File

@ -0,0 +1,114 @@
package service
import (
"fmt"
"service/api/consts"
accountproto "service/api/proto/account/proto"
streamerproto "service/api/proto/streamer/proto"
"service/bizcommon/util"
"service/dbstruct"
"github.com/gin-gonic/gin"
)
var DefaultTextAuditTaskResultHandler *TextAuditTaskResultHandler
type TextAuditTaskResultHandler struct {
// 图像审核任务通过及回退方法生成器map
textAuditTaskUpdateFuncGeneratorMap map[string](func(ctx *gin.Context, task *dbstruct.TextAuditTask, option int) func() error)
}
func NewTextAuditTaskResultHandler() *TextAuditTaskResultHandler {
handler := &TextAuditTaskResultHandler{}
handler.initTextAuditTaskUpdateFuncGeneratorMap()
return handler
}
func (handler *TextAuditTaskResultHandler) Handle(ctx *gin.Context, task *dbstruct.TextAuditTask, option int) error {
return handler.getTextAuditTaskUpdateFunc(ctx, task, option)()
}
func (handler *TextAuditTaskResultHandler) initTextAuditTaskUpdateFuncGeneratorMap() {
handler.textAuditTaskUpdateFuncGeneratorMap = make(map[string]func(ctx *gin.Context, task *dbstruct.TextAuditTask, option int) func() error)
handler.generateAccountNameUpdateFunc()
handler.generateStreamerBioUpdateFunc()
handler.generateStreamerAutoResponseMessageUpdateFunc()
}
func (handler *TextAuditTaskResultHandler) getTextAuditTaskUpdateFunc(ctx *gin.Context, task *dbstruct.TextAuditTask, option int) func() error {
db := util.DerefString(task.AssociativeDatabase)
table := util.DerefString(task.AssociativeTableName)
column := util.DerefString(task.AssociativeTableColumn)
key := fmt.Sprintf("%v|%v|%v", db, table, column)
updateFuncGenerator := handler.textAuditTaskUpdateFuncGeneratorMap[key]
updateFunc := updateFuncGenerator(ctx, task, option)
return updateFunc
}
// 用户表->姓名
func (handler *TextAuditTaskResultHandler) generateAccountNameUpdateFunc() {
handler.textAuditTaskUpdateFuncGeneratorMap["account|account|name"] = func(ctx *gin.Context, task *dbstruct.TextAuditTask, option int) func() error {
return func() error {
mid := task.AssociativeTableId
var name *string
if option == consts.TextAuditTaskUpdate_Pass {
name = task.AuditedText
} else {
name = task.OldText
}
return _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{
Account: &dbstruct.Account{
Mid: mid,
Name: name,
},
})
}
}
}
// 主播表->个性签名
func (handler *TextAuditTaskResultHandler) generateStreamerBioUpdateFunc() {
handler.textAuditTaskUpdateFuncGeneratorMap["streamer|streamer|bio"] = func(ctx *gin.Context, task *dbstruct.TextAuditTask, option int) func() error {
return func() error {
mid := task.AssociativeTableId
var bio *string
if option == consts.TextAuditTaskUpdate_Pass {
bio = task.AuditedText
} else {
bio = task.OldText
}
return _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{
Streamer: &dbstruct.Streamer{
Mid: mid,
Bio: bio,
},
})
}
}
}
// 主播表->自动回复消息
func (handler *TextAuditTaskResultHandler) generateStreamerAutoResponseMessageUpdateFunc() {
handler.textAuditTaskUpdateFuncGeneratorMap["streamer|streamer|auto_response_message"] = func(ctx *gin.Context, task *dbstruct.TextAuditTask, option int) func() error {
return func() error {
mid := task.AssociativeTableId
var autoResponseMessage *string
if option == consts.TextAuditTaskUpdate_Pass {
autoResponseMessage = task.AuditedText
} else {
autoResponseMessage = task.OldText
}
return _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{
Streamer: &dbstruct.Streamer{
Mid: mid,
AutoResponseMessage: autoResponseMessage,
},
})
}
}
}

View File

@ -18,7 +18,6 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/go-co-op/gocron"
xxl "github.com/xxl-job/xxl-job-executor-go"
goproto "google.golang.org/protobuf/proto"
)
@ -152,10 +151,28 @@ func (s *CronService) ImageAuditBatchHis(ctx context.Context, param *xxl.RunReq)
return fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
}
func (s *CronService) TextAuditBatch() {
scheduler := gocron.NewScheduler(time.UTC)
scheduler.Every(10).Second().Do(func() {
textaudit.GiveNoticeToBatch()
})
scheduler.StartAsync()
func (s *CronService) TextAuditBatch(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
// 刷新批次号
batchId := textaudit.RefreshBatchId()
// 执行图像审核
successNum, failNum, err := textaudit.Run(batchId)
if err != nil {
return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)
}
logger.Info("Text audit batch ends...")
return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
}
func (s *CronService) TextAuditBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
// 获取批次号
batchId := param.ExecutorParams
// 执行图像审核
successNum, failNum, err := textaudit.Run(batchId)
if err != nil {
return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err)
}
logger.Info("Text audit batch ends...")
return fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum)
}

View File

@ -40,7 +40,7 @@ func NewImageAuditTaskBatchControlBlock(tasks []*dbstruct.ImageAuditTask, batchI
taskCtrlBlock := NewImageAuditTaskControlBlock(task)
ctrlBlock.TaskCtrlBlocks = append(ctrlBlock.TaskCtrlBlocks, taskCtrlBlock)
ctrlBlock.RecordAction(taskCtrlBlock)
ctrlBlock.RecordImage(taskCtrlBlock, mediaFillables, imageIndex, taskIndex)
ctrlBlock.RecordImage(taskCtrlBlock, &mediaFillables, imageIndex, taskIndex)
}
mediafiller.FillList(&gin.Context{}, mediaFillables)
@ -62,12 +62,12 @@ func (ctrlBlock *ImageAuditTaskBatchControlBlock) RecordAction(taskCtrlBlock *Im
ctrlBlock.ActionMap[taskCtrlBlock.ActionId].Record(taskCtrlBlock)
}
func (ctrlBlock *ImageAuditTaskBatchControlBlock) RecordImage(taskCtrlBlock *ImageAuditTaskControlBlock, mediaFillables []mediafiller.MediaFillable, imageIndex int, taskIndex int) {
func (ctrlBlock *ImageAuditTaskBatchControlBlock) RecordImage(taskCtrlBlock *ImageAuditTaskControlBlock, mediaFillables *[]mediafiller.MediaFillable, imageIndex int, taskIndex int) {
// 未分片
if util.DerefInt64(taskCtrlBlock.ImageAuditTask.IsFragmented) == 0 {
ctrlBlock.Images = append(ctrlBlock.Images, taskCtrlBlock.ImageAuditTask.AuditedMedia)
mediaFillables = append(mediaFillables, taskCtrlBlock.ImageAuditTask.AuditedMedia)
*mediaFillables = append(*mediaFillables, taskCtrlBlock.ImageAuditTask.AuditedMedia)
ctrlBlock.Img2taskIndexMap[imageIndex] = taskIndex
ctrlBlock.ImageAuditIds = append(ctrlBlock.ImageAuditIds, util.DerefString(taskCtrlBlock.ImageAuditTask.ImageAuditId))
imageIndex++
@ -79,7 +79,7 @@ func (ctrlBlock *ImageAuditTaskBatchControlBlock) RecordImage(taskCtrlBlock *Ima
ImageIds: util.Int64Slice([]int64{imageId}),
}
ctrlBlock.Images = append(ctrlBlock.Images, image)
mediaFillables = append(mediaFillables, image)
*mediaFillables = append(*mediaFillables, image)
ctrlBlock.ImageAuditIds = append(ctrlBlock.ImageAuditIds, imageAuditIds[i])
ctrlBlock.Img2taskIndexMap[imageIndex] = taskIndex
imageIndex++

View File

@ -39,7 +39,7 @@ func AddTask(task *dbstruct.ImageAuditTask) error {
task.FragmentsNum = goproto.Int64(1)
// 写入图像审核表
if err := prepareFragmentedImageAuditTask(task); err != nil {
if err := prepareNotFragmentedImageAuditTask(task); err != nil {
return err
}
} else {
@ -47,7 +47,7 @@ func AddTask(task *dbstruct.ImageAuditTask) error {
task.FragmentsNum = goproto.Int64(int64(fragmentNum))
// 写入图像审核表
if err := prepareNotFragmentedImageAuditTask(task); err != nil {
if err := prepareFragmentedImageAuditTask(task); err != nil {
return err
}
}

View File

@ -40,5 +40,5 @@ func genereteBatchId() string {
now := time.Now()
y, m, d := now.Date()
h, mi, s := now.Clock()
return fmt.Sprintf("%d%2d%2d%2d%2d%02d", y, m, d, h, mi, s)
return fmt.Sprintf("%d%02d%02d%02d%02d%02d", y, m, d, h, mi, s)
}

View File

@ -33,9 +33,14 @@ type TextAuditTaskService interface {
OpHandleOverdue(ctx *gin.Context, task *dbstruct.TextAuditTask, batchId string) error
}
type TextAuditTaskResultHandler interface {
Handle(ctx *gin.Context, task *dbstruct.TextAuditTask, option int) error
}
var defaultTextAuditClient *textaudit.Client
var _DefaultTextAudit TextAuditService
var _DefaultTextAuditTask TextAuditTaskService
var _DefaultResultHandler TextAuditTaskResultHandler
var labels []string
func ConnectToTextAuditClient(client *textaudit.Client) {
@ -54,20 +59,14 @@ func Init(cfg *configcenter.TextAuditConfig) (err error) {
// 初始化调度器
initScheduler(cfg)
// 启动调度任务
defaultTextAuditTaskScheduler.Run()
return
}
func GiveNoticeToBatch() {
defaultTextAuditTaskScheduler.GiveNoticeToBatch()
}
func ConnectToTextAuditService(serivce TextAuditService) {
_DefaultTextAudit = serivce
}
func ConnectToTextAuditTaskService(service TextAuditTaskService) {
func ConnectToTextAuditTaskService(service TextAuditTaskService, handler TextAuditTaskResultHandler) {
_DefaultTextAuditTask = service
_DefaultResultHandler = handler
}

View File

@ -0,0 +1,83 @@
package textaudit
import (
"fmt"
"service/api/consts"
"service/bizcommon/util"
"service/dbstruct"
"service/library/logger"
"github.com/gin-gonic/gin"
goproto "google.golang.org/protobuf/proto"
)
// 批次图像审核任务控制块
type TextAuditTaskBatchControlBlock struct {
BatchId string // 批次号
TaskCtrlBlocks []*TextAuditTaskControlBlock // 任务控制块
ActionMap map[string]*TextAuditAction // 动作Id号-action的map
}
func NewTextAuditTaskBatchControlBlock(tasks []*dbstruct.TextAuditTask, batchId string) *TextAuditTaskBatchControlBlock {
if len(tasks) == 0 {
return nil
}
ctrlBlock := &TextAuditTaskBatchControlBlock{
BatchId: batchId,
TaskCtrlBlocks: make([]*TextAuditTaskControlBlock, 0),
ActionMap: make(map[string]*TextAuditAction),
}
for _, task := range tasks {
taskCtrlBlock := NewTextAuditTaskControlBlock(task)
ctrlBlock.TaskCtrlBlocks = append(ctrlBlock.TaskCtrlBlocks, taskCtrlBlock)
ctrlBlock.RecordAction(taskCtrlBlock)
}
return ctrlBlock
}
func (ctrlBlock *TextAuditTaskBatchControlBlock) RecordAction(taskCtrlBlock *TextAuditTaskControlBlock) {
ctx := &gin.Context{}
// 写map记录
if ctrlBlock.ActionMap[taskCtrlBlock.ActionId] == nil { // 写入ctrlBlock.ActionMap
ctrlBlock.ActionMap[taskCtrlBlock.ActionId] = NewTextAuditAction()
// 将此前批次对该元素未成功,待处理的审核全部置为已失效
if err := _DefaultTextAuditTask.OpHandleOverdue(ctx, taskCtrlBlock.TextAuditTask, util.DerefString(taskCtrlBlock.TextAuditTask.BatchId)); err != nil {
logger.Error("_DefaultTextAuditTask OpHandleOverdue fail :%v", err)
}
}
ctrlBlock.ActionMap[taskCtrlBlock.ActionId].Record(taskCtrlBlock)
}
// 文字审核任务控制块
// ActionId设计初衷由于文字审核是定时任务触发的批量作业如果在一次作业间隔有针对同一个文字媒体的多次更新则会提交关于它的多次审核需要配合乐观锁保证数据一致性
type TextAuditTaskControlBlock struct {
// 静态元素
ActionId string // 审核动作id号由文字审核实体数据库四要素拼接而成用于指示对数据库-表-单条数据-文字字段的审核动作
TextAuditTask *dbstruct.TextAuditTask // 审核任务
// 动态元素
IsTaskPassed bool // 任务状态,仅当任务已完成时有效
IsGivingNoticeToBatch bool // 是否进行批处理
}
// 新建文字审核任务块
func NewTextAuditTaskControlBlock(task *dbstruct.TextAuditTask) (tcb *TextAuditTaskControlBlock) {
if task == nil || task.AuditedText == nil {
return
}
task.BatchId = goproto.String(defaultTextAuditTaskScheduler.batchId)
task.Status = goproto.Int64(consts.TextAudit_Created)
tcb = &TextAuditTaskControlBlock{
ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName),
util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)),
TextAuditTask: task,
IsTaskPassed: true,
IsGivingNoticeToBatch: false,
}
return
}

View File

@ -1,11 +1,9 @@
package textaudit
import (
"fmt"
"service/api/consts"
textauditproto "service/api/proto/textaudit/proto"
textaudittaskproto "service/api/proto/textaudittask/proto"
"service/bizcommon/util"
"service/dbstruct"
"service/library/logger"
@ -13,78 +11,47 @@ import (
goproto "google.golang.org/protobuf/proto"
)
// 文字审核任务控制块
// ActionId设计初衷由于文字审核是定时任务触发的批量作业如果在一次作业间隔有针对同一个文字媒体的多次更新则会提交关于它的多次审核需要配合乐观锁保证数据一致性
type TextAuditTaskControlBlock struct {
// 静态元素
ActionId string // 审核动作id号由文字审核实体数据库四要素拼接而成用于指示对数据库-表-单条数据-文字字段的审核动作
TextAuditTask *dbstruct.TextAuditTask // 审核任务
RollbackFunc func() error // 审核失败时的回退方法
// 动态元素
IsTaskPassed bool // 任务状态,仅当任务已完成时有效
IsGivingNoticeToBatch bool // 是否进行批处理
func AddTasks(tasks []*dbstruct.TextAuditTask) error {
for _, task := range tasks {
err := AddTask(task)
if err != nil {
return err
}
}
return nil
}
// 新建文字审核任务块
func NewTextAuditTaskControlBlock(task *dbstruct.TextAuditTask, rollbackFunc func() error) (tcb *TextAuditTaskControlBlock) {
func AddTask(task *dbstruct.TextAuditTask) error {
if task == nil || task.AuditedText == nil {
return
return nil
}
task.BatchId = goproto.String(defaultTextAuditTaskScheduler.batchId)
task.Status = goproto.Int64(consts.TextAudit_Created)
tcb = &TextAuditTaskControlBlock{
ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName),
util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)),
TextAuditTask: task,
RollbackFunc: rollbackFunc,
IsTaskPassed: true,
IsGivingNoticeToBatch: false,
}
return
}
func AddTask(task *TextAuditTaskControlBlock) {
if err := task.Prepare(); err != nil {
logger.Error("TextAuditTaskPrepareException: %v", err)
return
}
defaultTextAuditTaskScheduler.AddTask(task)
}
// 将文字审核任务信息写入数据库,准备送审
func (task *TextAuditTaskControlBlock) Prepare() (err error) {
if task == nil {
return fmt.Errorf("task is nil, check your input")
}
// 1.写入文字审核表
ctx := &gin.Context{}
textAudit := &dbstruct.TextAudit{
AuditedText: task.TextAuditTask.AuditedText,
BatchId: task.TextAuditTask.BatchId,
AuditedText: task.AuditedText,
BatchId: task.BatchId,
Status: goproto.Int64(consts.TextAudit_Created),
}
if err = _DefaultTextAudit.OpCreate(ctx, &textauditproto.OpCreateReq{
if err := _DefaultTextAudit.OpCreate(ctx, &textauditproto.OpCreateReq{
TextAudit: textAudit,
}); err != nil {
logger.Error("Textaudit OpCreate failed: %v", err)
return
return err
}
task.TextAuditTask.TextAuditId = textAudit.Id
task.TextAuditId = textAudit.Id
// 2.写入文字审核任务表
if err := _DefaultTextAuditTask.OpCreate(&gin.Context{}, &textaudittaskproto.OpCreateReq{
TextAuditTask: task.TextAuditTask,
TextAuditTask: task,
}); err != nil {
logger.Error("Textaudittask OpCreate failed: %v", err)
return err
}
return nil
}

View File

@ -3,7 +3,6 @@ package textaudit
import (
"fmt"
"service/library/configcenter"
"service/library/logger"
"time"
)
@ -12,11 +11,7 @@ var defaultTextAuditTaskScheduler *TextAuditTaskScheduler
// 文字审核任务调度器
type TextAuditTaskScheduler struct {
// 缓冲池、同步标志
taskBuffer chan *TextAuditTaskControlBlock // 缓存待处理任务的缓冲池
batchFlag chan bool // 批处理同步标志
// 数据存储
taskPacket []*TextAuditTaskControlBlock // 一次批处理的任务包
batchFlag chan bool // 批处理同步标志
// 状态记录
batchId string // 当前批次号
@ -24,26 +19,12 @@ type TextAuditTaskScheduler struct {
func initScheduler(cfg *configcenter.TextAuditConfig) {
defaultTextAuditTaskScheduler = &TextAuditTaskScheduler{
taskBuffer: make(chan *TextAuditTaskControlBlock, cfg.TaskBufferSize),
batchFlag: make(chan bool, 1),
taskPacket: make([]*TextAuditTaskControlBlock, 0),
batchId: genereteBatchId(),
batchFlag: make(chan bool, 1),
batchId: genereteBatchId(),
}
defaultTextAuditTaskScheduler.batchFlag <- true
}
// 将任务加入缓冲池
func (s *TextAuditTaskScheduler) AddTask(task *TextAuditTaskControlBlock) {
s.taskBuffer <- task
}
// 通知进行批处理
func (s *TextAuditTaskScheduler) GiveNoticeToBatch() {
s.taskBuffer <- &TextAuditTaskControlBlock{
IsGivingNoticeToBatch: true,
}
}
// 批处理上锁
func (s *TextAuditTaskScheduler) lock() {
<-s.batchFlag
@ -54,47 +35,10 @@ func (s *TextAuditTaskScheduler) unLock() {
s.batchFlag <- true
}
// 调度方法
func (s *TextAuditTaskScheduler) Run() {
go func() {
// 1.循环取出缓冲池中的任务
// 2.如果任务标志进行批处理,则将任务包打包,进行审核作业,最后清空任务包,生成新的批次号
// 3.否则,将任务加入任务包中,并为任务建立索引,记录任务执行顺序
for {
task := <-s.taskBuffer
if task.IsGivingNoticeToBatch {
// 上锁
s.lock()
// 打包任务
packet := s.taskPacket
s.taskPacket = make([]*TextAuditTaskControlBlock, 0)
// 取出状态并重置
batchId := s.batchId
s.batchId = genereteBatchId()
// 执行审核作业
go func() {
err := executeTextAuditTasks(packet, batchId)
if err != nil {
logger.Error("Batch failed : %v", err)
}
// 解锁
s.unLock()
}()
} else {
s.taskPacket = append(s.taskPacket, task)
}
}
}()
}
// 生成批次号
func genereteBatchId() string {
now := time.Now()
y, m, d := now.Date()
h, mi, s := now.Clock()
return fmt.Sprintf("%d%d%d%d%d%02d", y, m, d, h, mi, s)
return fmt.Sprintf("%d%02d%02d%02d%02d%02d", y, m, d, h, mi, s)
}

View File

@ -16,64 +16,69 @@ import (
"github.com/gin-gonic/gin"
)
// 文字审核主逻辑
func executeTextAuditTasks(tasks []*TextAuditTaskControlBlock, batchId string) (err error) {
// 刷新批次号
func RefreshBatchId() string {
batchId := defaultTextAuditTaskScheduler.batchId
defaultTextAuditTaskScheduler.batchId = genereteBatchId()
return batchId
}
if len(tasks) == 0 {
return
func Run(batchId string) (successNum int, failNum int, err error) {
// 查询该批次所有审核任务
textaudittasks, err := _DefaultTextAuditTask.OpList(&gin.Context{}, &textaudittaskproto.OpListReq{
BatchId: goproto.String(batchId),
Status: goproto.Int64(consts.TextAudit_Created),
Sort: "ct",
})
if err != nil {
logger.Info("_DefaultTextAuditTask OpList fail: %v", err)
}
logger.Info("Text audit batch started, batchId : %v, task number : %v\n", batchId, len(tasks))
if len(textaudittasks) == 0 {
return 0, 0, nil
}
// 1.创建请求
req, actionMap, err := createScanTextRequest(tasks, batchId)
// 上锁
defaultTextAuditTaskScheduler.lock()
logger.Info("Text audit batch started, batchId : %v, task number : %v", batchId, len(textaudittasks))
// 1.构建批量任务控制块初始化必要的actionMap、填充送审text等信息
ctrlBlock := NewTextAuditTaskBatchControlBlock(textaudittasks, batchId)
// 2.创建请求
req, err := createScanTextRequest(ctrlBlock)
if err != nil {
logger.Info("Create Scan TextRequest fail: %v", err)
handleBatchError(tasks, batchId, err)
handleBatchError(ctrlBlock, err)
return
}
// 2.调用阿里的服务,收到应答
// 3.调用阿里的服务,收到应答
runtime := &teautils.RuntimeOptions{
ConnectTimeout: tea.Int(30000),
}
_result, err := defaultTextAuditClient.ScanTextWithOptions(req, runtime)
if err != nil {
logger.Error("ScanTextWithOptions fail : %v", err)
handleBatchError(tasks, batchId, err)
handleBatchError(ctrlBlock, err)
return
}
// 3.处理应答
err = handleScanTextResponse(_result, batchId, tasks, actionMap)
// 4.处理应答
err = handleScanTextResponse(ctrlBlock, _result)
// 解锁
defaultTextAuditTaskScheduler.unLock()
successNum = len(textaudittasks)
return
}
func createScanTextRequest(tasks []*TextAuditTaskControlBlock, batchId string) (request *textaudit.ScanTextRequest, actionMap map[string]*TextAuditAction, err error) {
ctx := &gin.Context{}
actionMap = make(map[string]*TextAuditAction) // 动作Id号-action的map
// 1.打包文字转成ScanTextRequest
func createScanTextRequest(ctrlBlock *TextAuditTaskBatchControlBlock) (request *textaudit.ScanTextRequest, err error) {
reqTasks := make([]*textaudit.ScanTextRequestTasks, 0)
for _, task := range tasks {
// 写map
if actionMap[task.ActionId] == nil { // 写入actionMap
actionMap[task.ActionId] = NewTextAuditAction()
// 将此前批次对该元素未成功,待处理的审核全部置为已失效
if err := _DefaultTextAuditTask.OpHandleOverdue(ctx, task.TextAuditTask, batchId); err != nil {
logger.Error("_DefaultTextAuditTask OpHandleOverdue fail :%v", err)
}
}
actionMap[task.ActionId].Record(task)
// 打包
for _, taskCtrlBlock := range ctrlBlock.TaskCtrlBlocks {
reqTasks = append(reqTasks, &textaudit.ScanTextRequestTasks{
Content: task.TextAuditTask.AuditedText,
Content: taskCtrlBlock.TextAuditTask.AuditedText,
})
}
@ -92,23 +97,25 @@ func createScanTextRequest(tasks []*TextAuditTaskControlBlock, batchId string) (
return
}
func handleScanTextResponse(resp *textaudit.ScanTextResponse, batchId string, tasks []*TextAuditTaskControlBlock, actionMap map[string]*TextAuditAction) (err error) {
func handleScanTextResponse(ctrlBlock *TextAuditTaskBatchControlBlock, resp *textaudit.ScanTextResponse) (err error) {
results := resp.Body.Data.Elements
taskCtrlBlocks := ctrlBlock.TaskCtrlBlocks
actionMap := ctrlBlock.ActionMap
for i, result := range results {
isActionCompleted := false
action := &TextAuditAction{}
// 1.立即在textaudit-textaudit中更新该次审核结果
pass, err := handleTextAudit(tasks[i].TextAuditTask.TextAuditId, result)
pass, err := handleTextAudit(taskCtrlBlocks[i].TextAuditTask.TextAuditId, result)
if err != nil {
logger.Error("handleTextAudit fail: %v", err)
}
// 2.更新任务状态
tasks[i].IsTaskPassed = pass
taskCtrlBlocks[i].IsTaskPassed = pass
// 3.通过task的actionId去actionId-[]*task的map查出本批次对该字段的动作链更新其中关于自己的状态
isActionCompleted, action = handleTaskAction(tasks[i], actionMap)
isActionCompleted, action = handleTaskAction(taskCtrlBlocks[i], actionMap)
// 4.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
if isActionCompleted {
@ -151,17 +158,15 @@ func handleTaskAction(task *TextAuditTaskControlBlock, actionMap map[string]*Tex
// 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
func finalizeTask(action *TextAuditAction) (err error) {
lastValidTask := action.TaskChain[0] // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务
lastValidTaskIndex := 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务的索引
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId失效任务是指那些前驱任务依然失败的失败任务这些任务已失去实际意义不可以用来回退
statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效
isActionInPassedStatus := true // 当前动作链是否处于成功中
lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务及其索引
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId失效任务是指那些前驱任务依然失败的失败任务这些任务已失去实际意义不可以用来回退
statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效
isActionInPassedStatus := true // 当前动作链是否处于成功中
for i, task := range action.TaskChain {
if !task.IsTaskPassed { // 动作链检测到失败任务
if isActionInPassedStatus { // 若动作链处于成功状态,则这是目前为止,最后一个可以用于回退的任务
lastValidTask = task
lastValidTaskIndex = i
lastValidTask, lastValidTaskIndex = task, i
statuses[i] = consts.TextAudit_Rollbacked
isActionInPassedStatus = false
} else { // 否则,若动作链处于失败状态,则该任务已失效
@ -180,7 +185,7 @@ func finalizeTask(action *TextAuditAction) (err error) {
}
// 判定任务链终态,终态非成功,执行回退操作,记录到数据库中
if !action.IsPassed() {
if err = executeRollBack(lastValidTask); err != nil {
if err = executeRollBack(lastValidTask.TextAuditTask); err != nil {
return
}
}
@ -246,13 +251,13 @@ func parseDetailsContexts(contexts []*textaudit.ScanTextResponseBodyDataElements
return ctxs
}
func executeRollBack(lastValidTask *TextAuditTaskControlBlock) (err error) {
func executeRollBack(lastValidTask *dbstruct.TextAuditTask) (err error) {
ctx := &gin.Context{}
if err = lastValidTask.RollbackFunc(); err != nil {
logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.TextAuditTask.Id), err)
if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.TextAuditTaskUpdate_Rollback); err != nil {
logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.Id), err)
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
TextAuditTask: &dbstruct.TextAuditTask{
Id: lastValidTask.TextAuditTask.Id,
Id: lastValidTask.Id,
Status: goproto.Int64(consts.TextAudit_Failed),
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
},
@ -263,7 +268,7 @@ func executeRollBack(lastValidTask *TextAuditTaskControlBlock) (err error) {
}
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
TextAuditTask: &dbstruct.TextAuditTask{
Id: lastValidTask.TextAuditTask.Id,
Id: lastValidTask.Id,
Status: goproto.Int64(consts.TextAudit_Rollbacked),
Remarks: goproto.String("任务审核失败,已将字段回退至之前的版本"),
},
@ -302,19 +307,19 @@ func updateExpiredTasks(expiredTaskIds []string) (err error) {
return
}
func handleBatchError(tasks []*TextAuditTaskControlBlock, batchId string, _err error) (err error) {
logger.Info("All tasks of this batchId: %v has failed, rolling back...", batchId)
func handleBatchError(ctrlBlock *TextAuditTaskBatchControlBlock, _err error) (err error) {
logger.Info("All tasks of this batchId: %v has failed, rolling back...", ctrlBlock.BatchId)
ctx := &gin.Context{}
if err = _DefaultTextAudit.OpUpdateByBatchId(ctx, batchId, &dbstruct.TextAudit{
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
if err = _DefaultTextAudit.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.TextAudit{
Status: goproto.Int64(consts.TextAudit_ServiceFailed),
Remarks: goproto.String("批次任务失败原因详见task表"),
}); err != nil {
logger.Error("_DefaultTextAudit OpUpdateByBatchId fail: %v\n", err)
return
}
if err = _DefaultTextAuditTask.OpUpdateByBatchId(ctx, batchId, &dbstruct.TextAuditTask{
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
if err = _DefaultTextAuditTask.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.TextAuditTask{
Status: goproto.Int64(consts.TextAudit_ServiceFailed),
Remarks: goproto.String(_err.Error()),
}); err != nil {
logger.Error("_DefaultTextAuditTask OpUpdateByBatchId fail: %v\n", err)
@ -322,12 +327,13 @@ func handleBatchError(tasks []*TextAuditTaskControlBlock, batchId string, _err e
}
// 回退
for _, task := range tasks {
if err = task.RollbackFunc(); err != nil {
for _, taskCtrlBlock := range ctrlBlock.TaskCtrlBlocks {
task := taskCtrlBlock.TextAuditTask
if err = _DefaultResultHandler.Handle(ctx, task, consts.TextAuditTaskUpdate_Rollback); err != nil {
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
TextAuditTask: &dbstruct.TextAuditTask{
Id: task.TextAuditTask.Id,
Status: goproto.Int64(consts.ImageAudit_Failed),
Id: task.Id,
Status: goproto.Int64(consts.TextAudit_Failed),
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
},
}); err != nil {
@ -335,5 +341,6 @@ func handleBatchError(tasks []*TextAuditTaskControlBlock, batchId string, _err e
}
}
}
return
}