service/library/contentaudit/textaudit/textaudit.go

347 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package textaudit
import (
"service/api/consts"
textauditproto "service/api/proto/textaudit/proto"
textaudittaskproto "service/api/proto/textaudittask/proto"
"service/bizcommon/util"
"service/dbstruct"
"service/library/logger"
textaudit "github.com/alibabacloud-go/imageaudit-20191230/v3/client"
teautils "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
goproto "google.golang.org/protobuf/proto"
"github.com/gin-gonic/gin"
)
// 刷新批次号
func RefreshBatchId() string {
batchId := defaultTextAuditTaskScheduler.batchId
defaultTextAuditTaskScheduler.batchId = genereteBatchId()
return batchId
}
func Run(batchId string) (successNum int, failNum int, err error) {
// 查询该批次所有审核任务
textaudittasks, err := _DefaultTextAuditTask.OpList(&gin.Context{}, &textaudittaskproto.OpListReq{
BatchId: goproto.String(batchId),
Status: goproto.Int64(consts.TextAudit_Created),
Sort: "ct",
})
if err != nil {
logger.Info("_DefaultTextAuditTask OpList fail: %v", err)
}
if len(textaudittasks) == 0 {
return 0, 0, nil
}
// 上锁
defaultTextAuditTaskScheduler.lock()
logger.Info("Text audit batch started, batchId : %v, task number : %v", batchId, len(textaudittasks))
// 1.构建批量任务控制块初始化必要的actionMap、填充送审text等信息
ctrlBlock := NewTextAuditTaskBatchControlBlock(textaudittasks, batchId)
// 2.创建请求
req, err := createScanTextRequest(ctrlBlock)
if err != nil {
logger.Info("Create Scan TextRequest fail: %v", err)
handleBatchError(ctrlBlock, err)
return
}
// 3.调用阿里的服务,收到应答
runtime := &teautils.RuntimeOptions{
ConnectTimeout: tea.Int(30000),
}
_result, err := defaultTextAuditClient.ScanTextWithOptions(req, runtime)
if err != nil {
logger.Error("ScanTextWithOptions fail : %v", err)
handleBatchError(ctrlBlock, err)
return
}
// 4.处理应答
err = handleScanTextResponse(ctrlBlock, _result)
// 解锁
defaultTextAuditTaskScheduler.unLock()
successNum = len(textaudittasks)
return
}
func createScanTextRequest(ctrlBlock *TextAuditTaskBatchControlBlock) (request *textaudit.ScanTextRequest, err error) {
reqTasks := make([]*textaudit.ScanTextRequestTasks, 0)
for _, taskCtrlBlock := range ctrlBlock.TaskCtrlBlocks {
reqTasks = append(reqTasks, &textaudit.ScanTextRequestTasks{
Content: taskCtrlBlock.TextAuditTask.AuditedText,
})
}
scanLabels := make([]*textaudit.ScanTextRequestLabels, len(labels))
for i := range scanLabels {
scanLabels[i] = &textaudit.ScanTextRequestLabels{
Label: goproto.String(labels[i]),
}
}
request = &textaudit.ScanTextRequest{
Labels: scanLabels,
Tasks: reqTasks,
}
logger.Info("本次打包:%v", reqTasks)
return
}
func handleScanTextResponse(ctrlBlock *TextAuditTaskBatchControlBlock, resp *textaudit.ScanTextResponse) (err error) {
results := resp.Body.Data.Elements
taskCtrlBlocks := ctrlBlock.TaskCtrlBlocks
actionMap := ctrlBlock.ActionMap
for i, result := range results {
isActionCompleted := false
action := &TextAuditAction{}
// 1.立即在textaudit-textaudit中更新该次审核结果
pass, err := handleTextAudit(taskCtrlBlocks[i].TextAuditTask.TextAuditId, result)
if err != nil {
logger.Error("handleTextAudit fail: %v", err)
}
// 2.更新任务状态
taskCtrlBlocks[i].IsTaskPassed = pass
// 3.通过task的actionId去actionId-[]*task的map查出本批次对该字段的动作链更新其中关于自己的状态
isActionCompleted, action = handleTaskAction(taskCtrlBlocks[i], actionMap)
// 4.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
if isActionCompleted {
if err = finalizeTask(action); err != nil {
logger.Error("finalizeTask fail: %v", err)
}
}
}
return
}
func handleTextAudit(id *string, result *textaudit.ScanTextResponseBodyDataElements) (pass bool, err error) {
ctx := &gin.Context{}
textaudit := &dbstruct.TextAudit{
Id: id,
}
pass, err = copyScanResultInfo(textaudit, result)
if err != nil {
logger.Error("Copy Scan Result Info fail: %v\n", err)
return
}
if err = _DefaultTextAudit.OpUpdate(ctx, &textauditproto.OpUpdateReq{
TextAudit: textaudit,
}); err != nil {
logger.Error("_DefaultTextAudit OpUpdate fail: %v\n", err)
return
}
return
}
// 通过task的actionId查出action, 通知该task已完成
func handleTaskAction(task *TextAuditTaskControlBlock, actionMap map[string]*TextAuditAction) (isActionCompleted bool, action *TextAuditAction) {
action = actionMap[task.ActionId]
action.AuditedTaskNum++
isActionCompleted = action.TaskNum == action.AuditedTaskNum
return
}
// 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
func finalizeTask(action *TextAuditAction) (err error) {
lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务及其索引
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId失效任务是指那些前驱任务依然失败的失败任务这些任务已失去实际意义不可以用来回退
statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效
isActionInPassedStatus := true // 当前动作链是否处于成功中
for i, task := range action.TaskChain {
if !task.IsTaskPassed { // 动作链检测到失败任务
if isActionInPassedStatus { // 若动作链处于成功状态,则这是目前为止,最后一个可以用于回退的任务
lastValidTask, lastValidTaskIndex = task, i
statuses[i] = consts.TextAudit_Rollbacked
isActionInPassedStatus = false
} else { // 否则,若动作链处于失败状态,则该任务已失效
statuses[i] = consts.TextAudit_Expired
expiredTaskIds = append(expiredTaskIds, util.DerefString(task.TextAuditTask.Id))
}
} else { // 若动作链检测到成功任务,则重置动作链任务态,并将最后有效任务标志为失效(它已不再用于回退)
passTaskIds = append(passTaskIds, util.DerefString(task.TextAuditTask.Id))
statuses[i] = consts.TextAudit_Passed
isActionInPassedStatus = true
if statuses[lastValidTaskIndex] == consts.TextAudit_Rollbacked {
statuses[lastValidTaskIndex] = consts.TextAudit_Expired
expiredTaskIds = append(expiredTaskIds, util.DerefString(action.TaskChain[lastValidTaskIndex].TextAuditTask.Id))
}
}
}
// 判定任务链终态,终态非成功,执行回退操作,记录到数据库中
if !action.IsPassed() {
if err = executeRollBack(lastValidTask.TextAuditTask); err != nil {
return
}
}
// 更新所有任务状态
if err = updatePassedTasks(passTaskIds); err != nil {
return
}
if err = updateExpiredTasks(expiredTaskIds); err != nil {
return
}
logger.Info("action statuses : %v", statuses)
return
}
// 将文字审核结果copy到文字审核实体中
func copyScanResultInfo(textaudit *dbstruct.TextAudit, elements *textaudit.ScanTextResponseBodyDataElements) (pass bool, err error) {
pass = true
for _, result := range elements.Results {
textaudit.Suggestion = result.Suggestion
textaudit.Label = result.Label
textaudit.Rate = goproto.Float64(float64(util.DerefFloat32(result.Rate)))
if util.DerefString(textaudit.Label) == LabelPassed {
textaudit.Status = goproto.Int64(consts.TextAudit_Passed)
} else {
pass = false
textaudit.Status = goproto.Int64(consts.TextAudit_Rejected)
for _, detail := range result.Details {
label := TextAuditLabel(util.DerefString(detail.Label))
contexts := parseDetailsContexts(detail.Contexts)
switch label {
case Spam:
textaudit.SpamLabelDetails = &contexts
case Politics:
textaudit.PoliticsLabelDetails = &contexts
case Abuse:
textaudit.AbuseLabelDetails = &contexts
case Terrorism:
textaudit.TerrorismLabelDetails = &contexts
case Porn:
textaudit.PornLabelDetails = &contexts
case Flood:
textaudit.FloodLabelDetails = &contexts
case Contraband:
textaudit.ContrabandLabelDetails = &contexts
case Ad:
textaudit.AdLabelDetails = &contexts
}
}
}
}
return
}
func parseDetailsContexts(contexts []*textaudit.ScanTextResponseBodyDataElementsResultsDetailsContexts) []string {
ctxs := make([]string, len(contexts))
for i, context := range contexts {
ctxs[i] = util.DerefString(context.Context)
}
return ctxs
}
func executeRollBack(lastValidTask *dbstruct.TextAuditTask) (err error) {
ctx := &gin.Context{}
if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.TextAuditTaskUpdate_Rollback); err != nil {
logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.Id), err)
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
TextAuditTask: &dbstruct.TextAuditTask{
Id: lastValidTask.Id,
Status: goproto.Int64(consts.TextAudit_Failed),
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
},
}); err != nil {
logger.Error("_DefaultTextAuditTask OpUpdate fail: %v\n", err)
}
return
}
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
TextAuditTask: &dbstruct.TextAuditTask{
Id: lastValidTask.Id,
Status: goproto.Int64(consts.TextAudit_Rollbacked),
Remarks: goproto.String("任务审核失败,已将字段回退至之前的版本"),
},
}); err != nil {
logger.Error("_DefaultTextAuditTask OpUpdate fail: %v\n", err)
}
return
}
func updatePassedTasks(passTaskIds []string) (err error) {
ctx := &gin.Context{}
if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{
TextAuditTask: &dbstruct.TextAuditTask{
Status: goproto.Int64(consts.TextAudit_Passed),
},
Ids: passTaskIds,
}); err != nil {
logger.Error("_DefaultTextAuditTask OpUpdateByIds fail: %v\n", err)
return
}
return
}
func updateExpiredTasks(expiredTaskIds []string) (err error) {
ctx := &gin.Context{}
if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{
TextAuditTask: &dbstruct.TextAuditTask{
Status: goproto.Int64(consts.TextAudit_Expired),
},
Ids: expiredTaskIds,
}); err != nil {
logger.Error("_DefaultTextAuditTask OpUpdateByIds fail: %v\n", err)
return
}
return
}
func handleBatchError(ctrlBlock *TextAuditTaskBatchControlBlock, _err error) (err error) {
logger.Info("All tasks of this batchId: %v has failed, rolling back...", ctrlBlock.BatchId)
ctx := &gin.Context{}
if err = _DefaultTextAudit.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.TextAudit{
Status: goproto.Int64(consts.TextAudit_ServiceFailed),
Remarks: goproto.String("批次任务失败原因详见task表"),
}); err != nil {
logger.Error("_DefaultTextAudit OpUpdateByBatchId fail: %v\n", err)
return
}
if err = _DefaultTextAuditTask.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.TextAuditTask{
Status: goproto.Int64(consts.TextAudit_ServiceFailed),
Remarks: goproto.String(_err.Error()),
}); err != nil {
logger.Error("_DefaultTextAuditTask OpUpdateByBatchId fail: %v\n", err)
return
}
// 回退
for _, taskCtrlBlock := range ctrlBlock.TaskCtrlBlocks {
task := taskCtrlBlock.TextAuditTask
if err = _DefaultResultHandler.Handle(ctx, task, consts.TextAuditTaskUpdate_Rollback); err != nil {
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
TextAuditTask: &dbstruct.TextAuditTask{
Id: task.Id,
Status: goproto.Int64(consts.TextAudit_Failed),
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
},
}); err != nil {
logger.Error("_DefaultTextAudit OpUpdate fail: %v\n", err)
}
}
}
return
}