package textaudit import ( "service/api/consts" textauditproto "service/api/proto/textaudit/proto" textaudittaskproto "service/api/proto/textaudittask/proto" "service/bizcommon/util" "service/dbstruct" "service/library/logger" textaudit "github.com/alibabacloud-go/imageaudit-20191230/v3/client" teautils "github.com/alibabacloud-go/tea-utils/v2/service" "github.com/alibabacloud-go/tea/tea" goproto "google.golang.org/protobuf/proto" "github.com/gin-gonic/gin" ) // 刷新批次号 func RefreshBatchId() (string, error) { batchId, err := _DefaultContentAuditRTI.GetAndUpdateTextAuditBatchId(&gin.Context{}, genereteBatchId()) if err != nil { logger.Info("_DefaultContentAuditRTI GetAndUpdateTextAuditBatchId fail: %v", err) return "", err } return batchId, nil } func Run(batchId string) (successNum int, failNum int, err error) { // 查询该批次所有审核任务 textaudittasks, err := _DefaultTextAuditTask.OpList(&gin.Context{}, &textaudittaskproto.OpListReq{ BatchId: goproto.String(batchId), Status: goproto.Int64(consts.TextAudit_Created), Sort: "ct", }) if err != nil { logger.Info("_DefaultTextAuditTask OpList fail: %v", err) } if len(textaudittasks) == 0 { return 0, 0, nil } logger.Info("Text audit batch started, batchId : %v, task number : %v", batchId, len(textaudittasks)) // 1.构建批量任务控制块,初始化必要的actionMap、填充送审text等信息 ctrlBlock := NewTextAuditTaskBatchControlBlock(textaudittasks, batchId) // 2.创建请求 reqs, err := createScanTextRequest(ctrlBlock) if err != nil { logger.Info("Create Scan TextRequest fail: %v", err) handleBatchError(ctrlBlock, err) return } // 3.调用阿里的服务,收到应答 runtime := &teautils.RuntimeOptions{ ConnectTimeout: tea.Int(30000), } results := make([]*textaudit.ScanTextResponseBodyDataElements, 0) for _, req := range reqs { var _result *textaudit.ScanTextResponse _result, err = defaultTextAuditClient.ScanTextWithOptions(req, runtime) if err != nil { logger.Error("ScanTextWithOptions fail : %v", err) handleBatchError(ctrlBlock, err) return } results = append(results, _result.Body.Data.Elements...) logger.Info("Receive the response from ScanTextWithOptions: %v", _result.String()) } // 4.处理应答 err = handleScanTextResponseBodyDataElements(ctrlBlock, results) successNum = len(textaudittasks) return } func createScanTextRequest(ctrlBlock *TextAuditTaskBatchControlBlock) (requests []*textaudit.ScanTextRequest, err error) { reqTasks := make([]*textaudit.ScanTextRequestTasks, 0) for _, taskCtrlBlock := range ctrlBlock.TaskCtrlBlocks { reqTasks = append(reqTasks, &textaudit.ScanTextRequestTasks{ Content: taskCtrlBlock.TextAuditTask.AuditedText, }) } scanLabels := make([]*textaudit.ScanTextRequestLabels, len(labels)) for i := range scanLabels { scanLabels[i] = &textaudit.ScanTextRequestLabels{ Label: goproto.String(labels[i]), } } // 阿里云一次最多审10条,将待审图片按10条拆分 requests = make([]*textaudit.ScanTextRequest, 0) i := 0 for ; i < len(reqTasks)/10; i++ { request := &textaudit.ScanTextRequest{ Labels: scanLabels, Tasks: reqTasks[i*10 : (i+1)*10], } requests = append(requests, request) } if i*10 < len(reqTasks) { request := &textaudit.ScanTextRequest{ Labels: scanLabels, Tasks: reqTasks[i*10:], } requests = append(requests, request) } logger.Info("本次打包:%v", reqTasks) return } func handleScanTextResponseBodyDataElements(ctrlBlock *TextAuditTaskBatchControlBlock, results []*textaudit.ScanTextResponseBodyDataElements) (err error) { taskCtrlBlocks := ctrlBlock.TaskCtrlBlocks actionMap := ctrlBlock.ActionMap for i, result := range results { isActionCompleted := false action := &TextAuditAction{} // 1.立即在textaudit-textaudit中更新该次审核结果 logger.Info("text_audit_id:%v", taskCtrlBlocks[i].TextAuditTask.TextAuditId) pass, textaudit, err := handleTextAudit(taskCtrlBlocks[i].TextAuditTask.TextAuditId, result) if err != nil { logger.Error("handleTextAudit fail: %v", err) } // 2.更新任务状态 taskCtrlBlocks[i].IsTaskPassed = pass taskCtrlBlocks[i].TextAuditTask.TextAudit = textaudit // 3.通过task的actionId,去actionId-[]*task的map查出本批次对该字段的动作链,更新其中关于自己的状态 isActionCompleted, action = handleTaskAction(taskCtrlBlocks[i], actionMap) // 4.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 if isActionCompleted { logger.Info("entering finalizeTask...") if err = finalizeTask(action); err != nil { logger.Error("finalizeTask fail: %v", err) } } } return } func handleTextAudit(id *string, result *textaudit.ScanTextResponseBodyDataElements) (pass bool, textaudit *dbstruct.TextAudit, err error) { ctx := &gin.Context{} textaudit = &dbstruct.TextAudit{ Id: id, } logger.Info("entering copyScanResultInfo...") pass, err = copyScanResultInfo(textaudit, result) if err != nil { logger.Error("Copy Scan Result Info fail: %v\n", err) return } logger.Info("entering _DefaultTextAudit.OpUpdate...") if err = _DefaultTextAudit.OpUpdate(ctx, &textauditproto.OpUpdateReq{ TextAudit: textaudit, }); err != nil { logger.Error("_DefaultTextAudit OpUpdate fail: %v\n", err) return } return } // 通过task的actionId查出action, 通知该task已完成 func handleTaskAction(task *TextAuditTaskControlBlock, actionMap map[string]*TextAuditAction) (isActionCompleted bool, action *TextAuditAction) { logger.Info("entering handleTaskAction...") action = actionMap[task.ActionId] action.AuditedTaskNum++ isActionCompleted = action.TaskNum == action.AuditedTaskNum return } // 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 func finalizeTask(action *TextAuditAction) (err error) { lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务及其索引 passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId,失效任务是指那些前驱任务依然失败的失败任务,这些任务已失去实际意义,不可以用来回退 statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效 isActionInPassedStatus := true // 当前动作链是否处于成功中 for i, task := range action.TaskChain { if !task.IsTaskPassed { // 动作链检测到失败任务 if isActionInPassedStatus { // 若动作链处于成功状态,则这是目前为止,最后一个可以用于回退的任务 lastValidTask, lastValidTaskIndex = task, i statuses[i] = consts.TextAudit_Rollbacked isActionInPassedStatus = false } else { // 否则,若动作链处于失败状态,则该任务已失效 statuses[i] = consts.TextAudit_Expired expiredTaskIds = append(expiredTaskIds, util.DerefString(task.TextAuditTask.Id)) } } else { // 若动作链检测到成功任务,则重置动作链任务态,并将最后有效任务标志为失效(它已不再用于回退) passTaskIds = append(passTaskIds, util.DerefString(task.TextAuditTask.Id)) statuses[i] = consts.TextAudit_Passed isActionInPassedStatus = true if statuses[lastValidTaskIndex] == consts.TextAudit_Rollbacked { statuses[lastValidTaskIndex] = consts.TextAudit_Expired expiredTaskIds = append(expiredTaskIds, util.DerefString(action.TaskChain[lastValidTaskIndex].TextAuditTask.Id)) } } } logger.Info("reday for handler execution...") // 判定任务链终态,终态非成功,执行回退操作,记录到数据库中 if !action.IsPassed() { if err = executeRollBack(lastValidTask.TextAuditTask); err != nil { return } } // 更新所有任务状态 if err = updatePassedTasks(passTaskIds); err != nil { return } if err = updateExpiredTasks(expiredTaskIds); err != nil { return } // 终态成功,执行成功后操作 logger.Info("reday for success...") if action.IsPassed() { if err = handleSuccess(action.TaskChain[len(action.TaskChain)-1].TextAuditTask); err != nil { return } } logger.Info("action statuses : %v", statuses) return } // 将文字审核结果copy到文字审核实体中 func copyScanResultInfo(textaudit *dbstruct.TextAudit, elements *textaudit.ScanTextResponseBodyDataElements) (pass bool, err error) { pass = true if len(elements.Results) == 0 { pass = false textaudit.Status = goproto.Int64(consts.TextAudit_ServiceFailed) textaudit.NotPassScenes = append(textaudit.NotPassScenes, "机审失败") return } for _, result := range elements.Results { textaudit.Suggestion = result.Suggestion textaudit.Label = result.Label textaudit.Rate = goproto.Float64(float64(util.DerefFloat32(result.Rate))) if util.DerefString(textaudit.Label) == LabelPassed { textaudit.Status = goproto.Int64(consts.TextAudit_Passed) } else { pass = false textaudit.Status = goproto.Int64(consts.TextAudit_Rejected) for _, detail := range result.Details { label := TextAuditLabel(util.DerefString(detail.Label)) contexts := parseDetailsContexts(detail.Contexts) switch label { case Spam: textaudit.NotPassScenes = append(textaudit.NotPassScenes, "垃圾内容") textaudit.SpamLabelDetails = &contexts case Politics: textaudit.NotPassScenes = append(textaudit.NotPassScenes, "敏感内容") textaudit.PoliticsLabelDetails = &contexts case Abuse: textaudit.NotPassScenes = append(textaudit.NotPassScenes, "辱骂内容") textaudit.AbuseLabelDetails = &contexts case Terrorism: textaudit.NotPassScenes = append(textaudit.NotPassScenes, "暴恐内容") textaudit.TerrorismLabelDetails = &contexts case Porn: textaudit.NotPassScenes = append(textaudit.NotPassScenes, "色情内容") textaudit.PornLabelDetails = &contexts case Flood: textaudit.NotPassScenes = append(textaudit.NotPassScenes, "灌水内容") textaudit.FloodLabelDetails = &contexts case Contraband: textaudit.NotPassScenes = append(textaudit.NotPassScenes, "违禁内容") textaudit.ContrabandLabelDetails = &contexts case Ad: textaudit.NotPassScenes = append(textaudit.NotPassScenes, "广告内容") textaudit.AdLabelDetails = &contexts } } } } return } func parseDetailsContexts(contexts []*textaudit.ScanTextResponseBodyDataElementsResultsDetailsContexts) []string { ctxs := make([]string, len(contexts)) for i, context := range contexts { ctxs[i] = util.DerefString(context.Context) } return ctxs } func executeRollBack(lastValidTask *dbstruct.TextAuditTask) (err error) { ctx := &gin.Context{} lastValidTask.Status = goproto.Int64(consts.TextAudit_Rollbacked) if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.TextAuditTaskUpdate_Rollback); err != nil { logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.Id), err) if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{ TextAuditTask: &dbstruct.TextAuditTask{ Id: lastValidTask.Id, Status: goproto.Int64(consts.TextAudit_Failed), Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"), }, }); err != nil { logger.Error("_DefaultTextAuditTask OpUpdate fail: %v\n", err) } return } if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{ TextAuditTask: &dbstruct.TextAuditTask{ Id: lastValidTask.Id, Status: goproto.Int64(consts.TextAudit_Rollbacked), Remarks: goproto.String("任务审核失败,已将字段回退至之前的版本"), }, }); err != nil { logger.Error("_DefaultTextAuditTask OpUpdate fail: %v\n", err) } return } func handleSuccess(task *dbstruct.TextAuditTask) (err error) { ctx := &gin.Context{} task.Status = goproto.Int64(consts.TextAudit_Passed) if err = _DefaultResultHandler.Handle(ctx, task, consts.TextAuditTaskUpdate_Success); err != nil { logger.Error("Handle success taskId:%v fail:%v", util.DerefString(task.Id), err) if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{ TextAuditTask: &dbstruct.TextAuditTask{ Id: task.Id, Status: goproto.Int64(consts.TextAudit_Failed), Remarks: goproto.String("任务审核成功,执行成功后操作失败,请联系管理员排查"), }, }); err != nil { logger.Error("_DefaultTextAuditTask OpUpdate fail: %v\n", err) } return } return } func updatePassedTasks(passTaskIds []string) (err error) { ctx := &gin.Context{} if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{ TextAuditTask: &dbstruct.TextAuditTask{ Status: goproto.Int64(consts.TextAudit_Passed), }, Ids: passTaskIds, }); err != nil { logger.Error("_DefaultTextAuditTask OpUpdateByIds fail: %v\n", err) return } return } func updateExpiredTasks(expiredTaskIds []string) (err error) { ctx := &gin.Context{} if err = _DefaultTextAuditTask.OpUpdateByIds(ctx, &textaudittaskproto.OpUpdateByIdsReq{ TextAuditTask: &dbstruct.TextAuditTask{ Status: goproto.Int64(consts.TextAudit_Expired), }, Ids: expiredTaskIds, }); err != nil { logger.Error("_DefaultTextAuditTask OpUpdateByIds fail: %v\n", err) return } return } func handleBatchError(ctrlBlock *TextAuditTaskBatchControlBlock, _err error) (err error) { logger.Info("All tasks of this batchId: %v has failed, rolling back...", ctrlBlock.BatchId) ctx := &gin.Context{} if err = _DefaultTextAudit.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.TextAudit{ Status: goproto.Int64(consts.TextAudit_ServiceFailed), Remarks: goproto.String("批次任务失败,原因详见task表"), }); err != nil { logger.Error("_DefaultTextAudit OpUpdateByBatchId fail: %v\n", err) return } if err = _DefaultTextAuditTask.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.TextAuditTask{ Status: goproto.Int64(consts.TextAudit_ServiceFailed), Remarks: goproto.String(_err.Error()), }); err != nil { logger.Error("_DefaultTextAuditTask OpUpdateByBatchId fail: %v\n", err) return } // 回退 for _, taskCtrlBlock := range ctrlBlock.TaskCtrlBlocks { task := taskCtrlBlock.TextAuditTask task.Status = goproto.Int64(consts.TextAudit_ServiceFailed) if err = _DefaultResultHandler.Handle(ctx, task, consts.TextAuditTaskUpdate_Rollback); err != nil { if err = _DefaultTextAuditTask.OpUpdate(ctx, &textaudittaskproto.OpUpdateReq{ TextAuditTask: &dbstruct.TextAuditTask{ Id: task.Id, Status: goproto.Int64(consts.TextAudit_Failed), Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"), }, }); err != nil { logger.Error("_DefaultTextAudit OpUpdate fail: %v\n", err) } } } return }