service/library/contentaudit/textaudit/textaudit.go

295 lines
9.9 KiB
Go
Raw Normal View History

2023-12-21 22:17:40 +08:00
package textaudit
import (
"service/api/consts"
textauditproto "service/api/proto/textaudit/proto"
textaudittaskproto "service/api/proto/textaudittask/proto"
"service/bizcommon/util"
"service/dbstruct"
"service/library/logger"
textaudit "github.com/alibabacloud-go/imageaudit-20191230/v3/client"
teautils "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
goproto "google.golang.org/protobuf/proto"
"github.com/gin-gonic/gin"
)
// 文字审核主逻辑
func executeTextAuditTasks(tasks []*TextAuditTaskControlBlock, batchId string) (err error) {
if len(tasks) == 0 {
return
}
logger.Info("Text audit batch started, batchId : %v, task number : %v\n", batchId, len(tasks))
// 1.创建请求
req, actionMap, err := createScanTextRequest(tasks, batchId)
if err != nil {
logger.Info("Create Scan TextRequest fail: %v", err)
return
}
// 2.调用阿里的服务,收到应答
runtime := &teautils.RuntimeOptions{
ConnectTimeout: tea.Int(30000),
}
_result, err := defaultTextAuditClient.ScanTextWithOptions(req, runtime)
if err != nil {
logger.Error("ScanTextWithOptions fail : %v", err)
}
// 3.处理应答
err = handleScanTextResponse(_result, batchId, tasks, actionMap)
return
}
func createScanTextRequest(tasks []*TextAuditTaskControlBlock, batchId string) (request *textaudit.ScanTextRequest, actionMap map[string]*TextAuditAction, err error) {
actionMap = make(map[string]*TextAuditAction) // 动作Id号-action的map
// 1.打包文字转成ScanTextRequest
reqTasks := make([]*textaudit.ScanTextRequestTasks, 0)
for _, task := range tasks {
// 写map
if actionMap[task.ActionId] == nil { // 写入actionMap
actionMap[task.ActionId] = NewTextAuditAction()
}
actionMap[task.ActionId].Record(task)
// 打包
reqTasks = append(reqTasks, &textaudit.ScanTextRequestTasks{
Content: task.TextAuditTask.AuditedText,
})
}
scanLabels := make([]*textaudit.ScanTextRequestLabels, len(labels))
for i := range scanLabels {
scanLabels[i] = &textaudit.ScanTextRequestLabels{
Label: goproto.String(labels[i]),
}
}
request = &textaudit.ScanTextRequest{
Labels: scanLabels,
Tasks: reqTasks,
}
logger.Info("本次打包:%v", reqTasks)
return
}
func handleScanTextResponse(resp *textaudit.ScanTextResponse, batchId string, tasks []*TextAuditTaskControlBlock, actionMap map[string]*TextAuditAction) (err error) {
results := resp.Body.Data.Elements
for i, result := range results {
isActionCompleted := false
action := &TextAuditAction{}
// 1.立即在textaudit-textaudit中更新该次审核结果
pass, err := handleTextAudit(tasks[i].TextAuditTask.TextAuditId, result)
if err != nil {
logger.Error("handleTextAudit fail: %v", err)
}
// 2.更新任务状态
tasks[i].IsTaskPassed = pass
// 3.通过task的actionId去actionId-[]*task的map查出本批次对该字段的动作链更新其中关于自己的状态
isActionCompleted, action = handleTaskAction(tasks[i], actionMap)
// 4.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
if isActionCompleted {
if err = finalizeTask(action); err != nil {
logger.Error("finalizeTask fail: %v", err)
}
}
}
return
}
func handleTextAudit(id *string, result *textaudit.ScanTextResponseBodyDataElements) (pass bool, err error) {
ctx := &gin.Context{}
textaudit := &dbstruct.TextAudit{
Id: id,
}
pass, err = copyScanResultInfo(textaudit, result)
if err != nil {
logger.Error("Copy Scan Result Info fail: %v\n", err)
return
}
if err = _DefaultTextAudit.OpUpdate(ctx, &textauditproto.OpUpdateReq{
TextAudit: textaudit,
}); err != nil {
logger.Error("_DefaultTextAudit OpUpdate fail: %v\n", err)
return
}
return
}
// 通过task的actionId查出action, 通知该task已完成
func handleTaskAction(task *TextAuditTaskControlBlock, actionMap map[string]*TextAuditAction) (isActionCompleted bool, action *TextAuditAction) {
action = actionMap[task.ActionId]
action.AuditedTaskNum++
isActionCompleted = action.TaskNum == action.AuditedTaskNum
return
}
// 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
func finalizeTask(action *TextAuditAction) (err error) {
lastValidTask := action.TaskChain[0] // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务
lastValidTaskIndex := 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务的索引
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId失效任务是指那些前驱任务依然失败的失败任务这些任务已失去实际意义不可以用来回退
statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效
isActionInPassedStatus := true // 当前动作链是否处于成功中
for i, task := range action.TaskChain {
if !task.IsTaskPassed { // 动作链检测到失败任务
if isActionInPassedStatus { // 若动作链处于成功状态,则这是目前为止,最后一个可以用于回退的任务
lastValidTask = task
lastValidTaskIndex = i
statuses[i] = consts.TextAudit_Rollbacked
isActionInPassedStatus = false
} else { // 否则,若动作链处于失败状态,则该任务已失效
statuses[i] = consts.TextAudit_Expired
expiredTaskIds = append(expiredTaskIds, util.DerefString(task.TextAuditTask.Id))
}
} else { // 若动作链检测到成功任务,则重置动作链任务态,并将最后有效任务标志为失效(它已不再用于回退)
passTaskIds = append(passTaskIds, util.DerefString(task.TextAuditTask.Id))
statuses[i] = consts.TextAudit_Passed
isActionInPassedStatus = true
if statuses[lastValidTaskIndex] == consts.TextAudit_Rollbacked {
statuses[lastValidTaskIndex] = consts.TextAudit_Expired
expiredTaskIds = append(expiredTaskIds, util.DerefString(action.TaskChain[lastValidTaskIndex].TextAuditTask.Id))
}
}
}
// 判定任务链终态,终态非成功,执行回退操作,记录到数据库中
if !action.IsPassed() {
if err = executeRollBack(lastValidTask); err != nil {
return
}
}
// 更新所有任务状态
if err = updatePassedTasks(passTaskIds); err != nil {
return
}
if err = updateExpiredTasks(expiredTaskIds); err != nil {
return
}
logger.Info("action statuses : %v", statuses)
return
}
// 将文字审核结果copy到文字审核实体中
func copyScanResultInfo(textaudit *dbstruct.TextAudit, elements *textaudit.ScanTextResponseBodyDataElements) (pass bool, err error) {
pass = true
for _, result := range elements.Results {
textaudit.Suggestion = result.Suggestion
textaudit.Label = result.Label
textaudit.Rate = goproto.Float64(float64(util.DerefFloat32(result.Rate)))
if util.DerefString(textaudit.Label) == LabelPassed {
textaudit.Status = goproto.Int64(consts.TextAudit_Passed)
} else {
pass = false
textaudit.Status = goproto.Int64(consts.TextAudit_Rejected)
for _, detail := range result.Details {
label := TextAuditLabel(util.DerefString(detail.Label))
contexts := parseDetailsContexts(detail.Contexts)
switch label {
case Spam:
textaudit.SpamLabelDetails = &contexts
case Politics:
textaudit.PoliticsLabelDetails = &contexts
case Abuse:
textaudit.AbuseLabelDetails = &contexts
case Terrorism:
textaudit.TerrorismLabelDetails = &contexts
case Porn:
textaudit.PornLabelDetails = &contexts
case Flood:
textaudit.FloodLabelDetails = &contexts
case Contraband:
textaudit.ContrabandLabelDetails = &contexts
case Ad:
textaudit.AdLabelDetails = &contexts
}
}
}
}
return
}
func parseDetailsContexts(contexts []*textaudit.ScanTextResponseBodyDataElementsResultsDetailsContexts) []string {
ctxs := make([]string, len(contexts))
for i, context := range contexts {
ctxs[i] = util.DerefString(context.Context)
}
return ctxs
}
func executeRollBack(lastValidTask *TextAuditTaskControlBlock) (err error) {
ctx := &gin.Context{}
if err = lastValidTask.RollbackFunc(); err != nil {
logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.TextAuditTask.Id), err)
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
TextAuditTask: &dbstruct.TextAuditTask{
Id: lastValidTask.TextAuditTask.Id,
Status: goproto.Int64(consts.TextAudit_Failed),
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
},
}); err != nil {
logger.Error("_DefaultTextAuditTask OpUpdate fail: %v\n", err)
}
return
}
if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{
TextAuditTask: &dbstruct.TextAuditTask{
Id: lastValidTask.TextAuditTask.Id,
Status: goproto.Int64(consts.TextAudit_Rollbacked),
Remarks: goproto.String("任务审核失败,已将字段回退至之前的版本"),
},
}); err != nil {
logger.Error("_DefaultTextAuditTask OpUpdate fail: %v\n", err)
}
return
}
func updatePassedTasks(passTaskIds []string) (err error) {
ctx := &gin.Context{}
if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{
TextAuditTask: &dbstruct.TextAuditTask{
Status: goproto.Int64(consts.TextAudit_Passed),
},
Ids: passTaskIds,
}); err != nil {
logger.Error("_DefaultTextAuditTask OpUpdateByIds fail: %v\n", err)
return
}
return
}
func updateExpiredTasks(expiredTaskIds []string) (err error) {
ctx := &gin.Context{}
if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{
TextAuditTask: &dbstruct.TextAuditTask{
Status: goproto.Int64(consts.TextAudit_Expired),
},
Ids: expiredTaskIds,
}); err != nil {
logger.Error("_DefaultTextAuditTask OpUpdateByIds fail: %v\n", err)
return
}
return
}