342 lines
12 KiB
Go
342 lines
12 KiB
Go
package textaudit
|
||
|
||
import (
|
||
"service/api/consts"
|
||
textauditproto "service/api/proto/textaudit/proto"
|
||
textaudittaskproto "service/api/proto/textaudittask/proto"
|
||
"service/bizcommon/util"
|
||
"service/dbstruct"
|
||
"service/library/logger"
|
||
|
||
textaudit "github.com/alibabacloud-go/imageaudit-20191230/v3/client"
|
||
teautils "github.com/alibabacloud-go/tea-utils/v2/service"
|
||
"github.com/alibabacloud-go/tea/tea"
|
||
goproto "google.golang.org/protobuf/proto"
|
||
|
||
"github.com/gin-gonic/gin"
|
||
)
|
||
|
||
// 刷新批次号
|
||
func RefreshBatchId() string {
|
||
batchId := defaultTextAuditTaskScheduler.batchId
|
||
defaultTextAuditTaskScheduler.batchId = genereteBatchId()
|
||
return batchId
|
||
}
|
||
|
||
func Run(batchId string) (successNum int, failNum int, err error) {
|
||
|
||
// 查询该批次所有审核任务
|
||
textaudittasks, err := _DefaultTextAuditTask.OpList(&gin.Context{}, &textaudittaskproto.OpListReq{
|
||
BatchId: goproto.String(batchId),
|
||
Status: goproto.Int64(consts.TextAudit_Created),
|
||
Sort: "ct",
|
||
})
|
||
if err != nil {
|
||
logger.Info("_DefaultTextAuditTask OpList fail: %v", err)
|
||
}
|
||
|
||
if len(textaudittasks) == 0 {
|
||
return 0, 0, nil
|
||
}
|
||
|
||
logger.Info("Text audit batch started, batchId : %v, task number : %v", batchId, len(textaudittasks))
|
||
// 1.构建批量任务控制块,初始化必要的actionMap、填充送审text等信息
|
||
ctrlBlock := NewTextAuditTaskBatchControlBlock(textaudittasks, batchId)
|
||
|
||
// 2.创建请求
|
||
req, err := createScanTextRequest(ctrlBlock)
|
||
if err != nil {
|
||
logger.Info("Create Scan TextRequest fail: %v", err)
|
||
handleBatchError(ctrlBlock, err)
|
||
return
|
||
}
|
||
|
||
// 3.调用阿里的服务,收到应答
|
||
runtime := &teautils.RuntimeOptions{
|
||
ConnectTimeout: tea.Int(30000),
|
||
}
|
||
_result, err := defaultTextAuditClient.ScanTextWithOptions(req, runtime)
|
||
if err != nil {
|
||
logger.Error("ScanTextWithOptions fail : %v", err)
|
||
handleBatchError(ctrlBlock, err)
|
||
return
|
||
}
|
||
|
||
// 4.处理应答
|
||
err = handleScanTextResponse(ctrlBlock, _result)
|
||
|
||
successNum = len(textaudittasks)
|
||
return
|
||
}
|
||
|
||
func createScanTextRequest(ctrlBlock *TextAuditTaskBatchControlBlock) (request *textaudit.ScanTextRequest, err error) {
|
||
reqTasks := make([]*textaudit.ScanTextRequestTasks, 0)
|
||
for _, taskCtrlBlock := range ctrlBlock.TaskCtrlBlocks {
|
||
reqTasks = append(reqTasks, &textaudit.ScanTextRequestTasks{
|
||
Content: taskCtrlBlock.TextAuditTask.AuditedText,
|
||
})
|
||
}
|
||
|
||
scanLabels := make([]*textaudit.ScanTextRequestLabels, len(labels))
|
||
for i := range scanLabels {
|
||
scanLabels[i] = &textaudit.ScanTextRequestLabels{
|
||
Label: goproto.String(labels[i]),
|
||
}
|
||
}
|
||
|
||
request = &textaudit.ScanTextRequest{
|
||
Labels: scanLabels,
|
||
Tasks: reqTasks,
|
||
}
|
||
logger.Info("本次打包:%v", reqTasks)
|
||
return
|
||
}
|
||
|
||
func handleScanTextResponse(ctrlBlock *TextAuditTaskBatchControlBlock, resp *textaudit.ScanTextResponse) (err error) {
|
||
results := resp.Body.Data.Elements
|
||
taskCtrlBlocks := ctrlBlock.TaskCtrlBlocks
|
||
actionMap := ctrlBlock.ActionMap
|
||
for i, result := range results {
|
||
isActionCompleted := false
|
||
action := &TextAuditAction{}
|
||
|
||
// 1.立即在textaudit-textaudit中更新该次审核结果
|
||
pass, err := handleTextAudit(taskCtrlBlocks[i].TextAuditTask.TextAuditId, result)
|
||
if err != nil {
|
||
logger.Error("handleTextAudit fail: %v", err)
|
||
}
|
||
|
||
// 2.更新任务状态
|
||
taskCtrlBlocks[i].IsTaskPassed = pass
|
||
|
||
// 3.通过task的actionId,去actionId-[]*task的map查出本批次对该字段的动作链,更新其中关于自己的状态
|
||
isActionCompleted, action = handleTaskAction(taskCtrlBlocks[i], actionMap)
|
||
|
||
// 4.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
|
||
if isActionCompleted {
|
||
if err = finalizeTask(action); err != nil {
|
||
logger.Error("finalizeTask fail: %v", err)
|
||
}
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
func handleTextAudit(id *string, result *textaudit.ScanTextResponseBodyDataElements) (pass bool, err error) {
|
||
ctx := &gin.Context{}
|
||
textaudit := &dbstruct.TextAudit{
|
||
Id: id,
|
||
}
|
||
pass, err = copyScanResultInfo(textaudit, result)
|
||
if err != nil {
|
||
logger.Error("Copy Scan Result Info fail: %v\n", err)
|
||
return
|
||
}
|
||
if err = _DefaultTextAudit.OpUpdate(ctx, &textauditproto.OpUpdateReq{
|
||
TextAudit: textaudit,
|
||
}); err != nil {
|
||
logger.Error("_DefaultTextAudit OpUpdate fail: %v\n", err)
|
||
return
|
||
}
|
||
return
|
||
}
|
||
|
||
// 通过task的actionId查出action, 通知该task已完成
|
||
func handleTaskAction(task *TextAuditTaskControlBlock, actionMap map[string]*TextAuditAction) (isActionCompleted bool, action *TextAuditAction) {
|
||
action = actionMap[task.ActionId]
|
||
action.AuditedTaskNum++
|
||
|
||
isActionCompleted = action.TaskNum == action.AuditedTaskNum
|
||
return
|
||
|
||
}
|
||
|
||
// 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
|
||
func finalizeTask(action *TextAuditAction) (err error) {
|
||
lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务及其索引
|
||
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
|
||
expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId,失效任务是指那些前驱任务依然失败的失败任务,这些任务已失去实际意义,不可以用来回退
|
||
statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效
|
||
isActionInPassedStatus := true // 当前动作链是否处于成功中
|
||
for i, task := range action.TaskChain {
|
||
if !task.IsTaskPassed { // 动作链检测到失败任务
|
||
if isActionInPassedStatus { // 若动作链处于成功状态,则这是目前为止,最后一个可以用于回退的任务
|
||
lastValidTask, lastValidTaskIndex = task, i
|
||
statuses[i] = consts.TextAudit_Rollbacked
|
||
isActionInPassedStatus = false
|
||
} else { // 否则,若动作链处于失败状态,则该任务已失效
|
||
statuses[i] = consts.TextAudit_Expired
|
||
expiredTaskIds = append(expiredTaskIds, util.DerefString(task.TextAuditTask.Id))
|
||
}
|
||
} else { // 若动作链检测到成功任务,则重置动作链任务态,并将最后有效任务标志为失效(它已不再用于回退)
|
||
passTaskIds = append(passTaskIds, util.DerefString(task.TextAuditTask.Id))
|
||
statuses[i] = consts.TextAudit_Passed
|
||
isActionInPassedStatus = true
|
||
if statuses[lastValidTaskIndex] == consts.TextAudit_Rollbacked {
|
||
statuses[lastValidTaskIndex] = consts.TextAudit_Expired
|
||
expiredTaskIds = append(expiredTaskIds, util.DerefString(action.TaskChain[lastValidTaskIndex].TextAuditTask.Id))
|
||
}
|
||
}
|
||
}
|
||
// 判定任务链终态,终态非成功,执行回退操作,记录到数据库中
|
||
if !action.IsPassed() {
|
||
if err = executeRollBack(lastValidTask.TextAuditTask); err != nil {
|
||
return
|
||
}
|
||
}
|
||
|
||
// 更新所有任务状态
|
||
if err = updatePassedTasks(passTaskIds); err != nil {
|
||
return
|
||
}
|
||
if err = updateExpiredTasks(expiredTaskIds); err != nil {
|
||
return
|
||
}
|
||
|
||
logger.Info("action statuses : %v", statuses)
|
||
|
||
return
|
||
}
|
||
|
||
// 将文字审核结果copy到文字审核实体中
|
||
func copyScanResultInfo(textaudit *dbstruct.TextAudit, elements *textaudit.ScanTextResponseBodyDataElements) (pass bool, err error) {
|
||
|
||
pass = true
|
||
for _, result := range elements.Results {
|
||
textaudit.Suggestion = result.Suggestion
|
||
textaudit.Label = result.Label
|
||
textaudit.Rate = goproto.Float64(float64(util.DerefFloat32(result.Rate)))
|
||
if util.DerefString(textaudit.Label) == LabelPassed {
|
||
textaudit.Status = goproto.Int64(consts.TextAudit_Passed)
|
||
} else {
|
||
pass = false
|
||
textaudit.Status = goproto.Int64(consts.TextAudit_Rejected)
|
||
for _, detail := range result.Details {
|
||
label := TextAuditLabel(util.DerefString(detail.Label))
|
||
contexts := parseDetailsContexts(detail.Contexts)
|
||
switch label {
|
||
case Spam:
|
||
textaudit.SpamLabelDetails = &contexts
|
||
case Politics:
|
||
textaudit.PoliticsLabelDetails = &contexts
|
||
case Abuse:
|
||
textaudit.AbuseLabelDetails = &contexts
|
||
case Terrorism:
|
||
textaudit.TerrorismLabelDetails = &contexts
|
||
case Porn:
|
||
textaudit.PornLabelDetails = &contexts
|
||
case Flood:
|
||
textaudit.FloodLabelDetails = &contexts
|
||
case Contraband:
|
||
textaudit.ContrabandLabelDetails = &contexts
|
||
case Ad:
|
||
textaudit.AdLabelDetails = &contexts
|
||
}
|
||
}
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
func parseDetailsContexts(contexts []*textaudit.ScanTextResponseBodyDataElementsResultsDetailsContexts) []string {
|
||
ctxs := make([]string, len(contexts))
|
||
for i, context := range contexts {
|
||
ctxs[i] = util.DerefString(context.Context)
|
||
}
|
||
return ctxs
|
||
}
|
||
|
||
func executeRollBack(lastValidTask *dbstruct.TextAuditTask) (err error) {
|
||
ctx := &gin.Context{}
|
||
if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.TextAuditTaskUpdate_Rollback); err != nil {
|
||
logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.Id), err)
|
||
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
|
||
TextAuditTask: &dbstruct.TextAuditTask{
|
||
Id: lastValidTask.Id,
|
||
Status: goproto.Int64(consts.TextAudit_Failed),
|
||
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
|
||
},
|
||
}); err != nil {
|
||
logger.Error("_DefaultTextAuditTask OpUpdate fail: %v\n", err)
|
||
}
|
||
return
|
||
}
|
||
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
|
||
TextAuditTask: &dbstruct.TextAuditTask{
|
||
Id: lastValidTask.Id,
|
||
Status: goproto.Int64(consts.TextAudit_Rollbacked),
|
||
Remarks: goproto.String("任务审核失败,已将字段回退至之前的版本"),
|
||
},
|
||
}); err != nil {
|
||
logger.Error("_DefaultTextAuditTask OpUpdate fail: %v\n", err)
|
||
}
|
||
return
|
||
}
|
||
|
||
func updatePassedTasks(passTaskIds []string) (err error) {
|
||
ctx := &gin.Context{}
|
||
if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{
|
||
TextAuditTask: &dbstruct.TextAuditTask{
|
||
Status: goproto.Int64(consts.TextAudit_Passed),
|
||
},
|
||
Ids: passTaskIds,
|
||
}); err != nil {
|
||
logger.Error("_DefaultTextAuditTask OpUpdateByIds fail: %v\n", err)
|
||
return
|
||
}
|
||
return
|
||
}
|
||
|
||
func updateExpiredTasks(expiredTaskIds []string) (err error) {
|
||
ctx := &gin.Context{}
|
||
if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{
|
||
TextAuditTask: &dbstruct.TextAuditTask{
|
||
Status: goproto.Int64(consts.TextAudit_Expired),
|
||
},
|
||
Ids: expiredTaskIds,
|
||
}); err != nil {
|
||
logger.Error("_DefaultTextAuditTask OpUpdateByIds fail: %v\n", err)
|
||
return
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
func handleBatchError(ctrlBlock *TextAuditTaskBatchControlBlock, _err error) (err error) {
|
||
logger.Info("All tasks of this batchId: %v has failed, rolling back...", ctrlBlock.BatchId)
|
||
ctx := &gin.Context{}
|
||
if err = _DefaultTextAudit.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.TextAudit{
|
||
Status: goproto.Int64(consts.TextAudit_ServiceFailed),
|
||
Remarks: goproto.String("批次任务失败,原因详见task表"),
|
||
}); err != nil {
|
||
logger.Error("_DefaultTextAudit OpUpdateByBatchId fail: %v\n", err)
|
||
return
|
||
}
|
||
|
||
if err = _DefaultTextAuditTask.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.TextAuditTask{
|
||
Status: goproto.Int64(consts.TextAudit_ServiceFailed),
|
||
Remarks: goproto.String(_err.Error()),
|
||
}); err != nil {
|
||
logger.Error("_DefaultTextAuditTask OpUpdateByBatchId fail: %v\n", err)
|
||
return
|
||
}
|
||
|
||
// 回退
|
||
for _, taskCtrlBlock := range ctrlBlock.TaskCtrlBlocks {
|
||
task := taskCtrlBlock.TextAuditTask
|
||
if err = _DefaultResultHandler.Handle(ctx, task, consts.TextAuditTaskUpdate_Rollback); err != nil {
|
||
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
|
||
TextAuditTask: &dbstruct.TextAuditTask{
|
||
Id: task.Id,
|
||
Status: goproto.Int64(consts.TextAudit_Failed),
|
||
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
|
||
},
|
||
}); err != nil {
|
||
logger.Error("_DefaultTextAudit OpUpdate fail: %v\n", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
return
|
||
}
|