package imageaudit import ( "encoding/json" "net/http" "service/api/consts" imageauditproto "service/api/proto/imageaudit/proto" imageaudittaskproto "service/api/proto/imageaudittask/proto" "service/bizcommon/util" "service/dbstruct" "service/library/logger" "service/library/mediafiller" imageaudit "github.com/alibabacloud-go/imageaudit-20191230/v3/client" teautils "github.com/alibabacloud-go/tea-utils/v2/service" "github.com/alibabacloud-go/tea/tea" goproto "google.golang.org/protobuf/proto" "github.com/gin-gonic/gin" ) // 图像审核主逻辑 func executeImageAuditTasks(tasks []*ImageAuditTaskControlBlock, batchId string) (err error) { if len(tasks) == 0 { return } logger.Info("Image audit batch started, batchId : %v, task number : %v\n", batchId, len(tasks)) // 1.创建请求 // oss不在上海的服务器,需要调用Advance接口 //req, taskMap, actionMap, err := createScanImageRequest(tasks, batchId) req, taskMap, actionMap, err := createScanImageAdvanceRequest(tasks, batchId) if err != nil { logger.Info("Create Scan ImageRequest fail: %v", err) handleBatchError(batchId, err) return } // 2.调用阿里的服务,收到应答 runtime := &teautils.RuntimeOptions{ ConnectTimeout: tea.Int(30000), } _result, err := defaultImageAuditClient.ScanImageAdvance(req, runtime) //_result, err := defaultImageAuditClient.ScanImageWithOptions(req, runtime) if err != nil { logger.Error("ScanImageAdvance fail : %v", err) handleBatchError(batchId, err) return } // 3.处理应答 err = handleScanImageResponse(_result, batchId, taskMap, actionMap) return } func createScanImageAdvanceRequest(tasks []*ImageAuditTaskControlBlock, batchId string) (request *imageaudit.ScanImageAdvanceRequest, taskMap map[int]*ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction, err error) { ctx := &gin.Context{} httpClient := http.Client{} // todo:taskMap其实可以删掉 taskMap = make(map[int]*ImageAuditTaskControlBlock) // 图像审核索引号-task的map actionMap = make(map[string]*ImageAuditAction) // 动作Id号-action的map // 1.获取所有图像信息 offset := 0 images := make([]mediafiller.MediaFillable, 0) for _, task := range tasks { // 写map记录 if actionMap[task.ActionId] == nil { // 写入actionMap actionMap[task.ActionId] = NewImageAuditAction() } actionMap[task.ActionId].Record(task) //获取图像 for _, image := range task.Images { images = append(images, image) taskMap[offset] = task offset++ } } mediafiller.FillList(ctx, images) // 2.打包图像url及图像审核id,转成ScanImageAdvanceRequest reqTasks := make([]*imageaudit.ScanImageAdvanceRequestTask, 0) for _, task := range tasks { if util.DerefInt64(task.ImageAuditTask.IsFragmented) == 1 { imageauditIds := util.DerefStringSlice(task.ImageAuditTask.ImageAuditFragmentIds) for i := range task.Images { var file *http.Response file, err = httpClient.Get(task.Images[i].Images[0].Urls[0]) if err != nil { logger.Error("httpClient Get fail : %v", err) return } reqTasks = append(reqTasks, &imageaudit.ScanImageAdvanceRequestTask{ DataId: goproto.String(imageauditIds[i]), ImageURLObject: file.Body, }) } } else { var file *http.Response file, err = httpClient.Get(task.Images[0].Images[0].Urls[0]) if err != nil { logger.Error("httpClient Get fail : %v", err) return } reqTasks = append(reqTasks, &imageaudit.ScanImageAdvanceRequestTask{ DataId: task.ImageAuditTask.ImageAuditId, ImageURLObject: file.Body, }) } } request = &imageaudit.ScanImageAdvanceRequest{ Scene: scenes, Task: reqTasks, } logger.Info("本次打包:%v", reqTasks) return } func handleScanImageResponse(resp *imageaudit.ScanImageResponse, batchId string, taskMap map[int]*ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction) (err error) { results := resp.Body.Data.Results for i, result := range results { isTaskCompleted := false isActionCompleted := false action := &ImageAuditAction{} // 1.dataId是imageaudit表主键Id, 唯一标识一次对单张图片的图像审核 dataId := util.DerefString(result.DataId) // 2.立即在imageaudit-imageaudit中更新该次审核结果 pass, err := handleImageAudit(dataId, result) if err != nil { logger.Error("handleImageAudit fail: %v", err) } // 3.取出task task := taskMap[i] // 4.在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功(非分片任务分片数为1) isTaskCompleted = handleTask(task, pass) // 5.通过task的actionId,去actionId-[]*task的map查出本批次对该字段的动作链,更新其中关于自己的状态 if isTaskCompleted { isActionCompleted, action = handleTaskAction(task, actionMap) } // 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 if isActionCompleted { if err = finalizeTask(action); err != nil { logger.Error("finalizeTask fail: %v", err) } } } return } func handleImageAudit(dataId string, result *imageaudit.ScanImageResponseBodyDataResults) (pass bool, err error) { ctx := &gin.Context{} imageaudit := &dbstruct.ImageAudit{ Id: goproto.String(dataId), } pass, err = copyScanResultInfo(imageaudit, result) if err != nil { logger.Error("Copy Scan Result Info fail: %v\n", err) return } if err = _DefaultImageAudit.OpUpdate(ctx, &imageauditproto.OpUpdateReq{ ImageAudit: imageaudit, }); err != nil { logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err) return } return } // 处理task,若task已分片,在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功 func handleTask(task *ImageAuditTaskControlBlock, pass bool) (isTaskCompleted bool) { task.IsTaskPassed = task.IsTaskPassed && pass task.AuditedFragmentsNum++ return task.AuditedFragmentsNum == int(util.DerefInt64(task.ImageAuditTask.FragmentsNum)) } // 通过task的actionId查出action, 通知该task已完成 func handleTaskAction(task *ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction) (isActionCompleted bool, action *ImageAuditAction) { action = actionMap[task.ActionId] action.AuditedTaskNum++ isActionCompleted = action.TaskNum == action.AuditedTaskNum return } // 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 func finalizeTask(action *ImageAuditAction) (err error) { lastValidTask := action.TaskChain[0] // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务 lastValidTaskIndex := 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务的索引 passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId,失效任务是指那些前驱任务依然失败的失败任务,这些任务已失去实际意义,不可以用来回退 statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效 isActionInPassedStatus := true // 当前动作链是否处于成功中 for i, task := range action.TaskChain { if !task.IsTaskPassed { // 动作链检测到失败任务 if isActionInPassedStatus { // 若动作链处于成功状态,则这是目前为止,最后一个可以用于回退的任务 lastValidTask = task lastValidTaskIndex = i statuses[i] = consts.ImageAudit_Rollbacked isActionInPassedStatus = false } else { // 否则,若动作链处于失败状态,则该任务已失效 statuses[i] = consts.ImageAudit_Expired expiredTaskIds = append(expiredTaskIds, util.DerefString(task.ImageAuditTask.Id)) } } else { // 若动作链检测到成功任务,则重置动作链任务态,并将最后有效任务标志为失效(它已不再用于回退) passTaskIds = append(passTaskIds, util.DerefString(task.ImageAuditTask.Id)) statuses[i] = consts.ImageAudit_Passed isActionInPassedStatus = true if statuses[lastValidTaskIndex] == consts.ImageAudit_Rollbacked { statuses[lastValidTaskIndex] = consts.ImageAudit_Expired expiredTaskIds = append(expiredTaskIds, util.DerefString(action.TaskChain[lastValidTaskIndex].ImageAuditTask.Id)) } } } // 判定任务链终态,终态非成功,执行回退操作,记录到数据库中 if !action.IsPassed() { if err = executeRollBack(lastValidTask); err != nil { return } } // 更新所有任务状态 if err = updatePassedTasks(passTaskIds); err != nil { return } if err = updateExpiredTasks(expiredTaskIds); err != nil { return } logger.Info("action statuses : %v", statuses) return } // 将图片审核结果copy到图片审核实体中 func copyScanResultInfo(imageaudit *dbstruct.ImageAudit, result *imageaudit.ScanImageResponseBodyDataResults) (pass bool, err error) { pass = true for _, subresult := range result.SubResults { scene := ImageAuditScene(util.DerefString(subresult.Scene)) switch scene { case Porn: imageaudit.PornSceneSuggestion = subresult.Suggestion imageaudit.PornSceneLabel = subresult.Label imageaudit.PornSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate))) pass = pass && (util.StringsContains(PornPassLabels, util.DerefString(subresult.Label))) case Terrorism: imageaudit.TerrorismSceneSuggestion = subresult.Suggestion imageaudit.TerrorismSceneLabel = subresult.Label imageaudit.TerrorismSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate))) pass = pass && (util.StringsContains(TerrorismPassLabels, util.DerefString(subresult.Label))) case Ad: imageaudit.AdSceneSuggestion = subresult.Suggestion imageaudit.AdSceneLabel = subresult.Label imageaudit.AdSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate))) pass = pass && (util.StringsContains(AdPassLabels, util.DerefString(subresult.Label))) case Live: imageaudit.LiveSceneSuggestion = subresult.Suggestion imageaudit.LiveSceneLabel = subresult.Label imageaudit.LiveSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate))) pass = pass && (util.StringsContains(LivePassLabels, util.DerefString(subresult.Label))) case Logo: imageaudit.LogoSceneSuggestion = subresult.Suggestion imageaudit.LogoSceneLabel = subresult.Label imageaudit.LogoSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate))) pass = pass && (util.StringsContains(LogoPassLabels, util.DerefString(subresult.Label))) } } if pass { imageaudit.Status = goproto.Int64(consts.ImageAudit_Passed) } else { imageaudit.Status = goproto.Int64(consts.ImageAudit_Rejected) var bytes []byte bytes, err = json.Marshal(result) if err != nil { return } imageaudit.Details = &bytes } return } func executeRollBack(lastValidTask *ImageAuditTaskControlBlock) (err error) { ctx := &gin.Context{} if err = lastValidTask.RollbackFunc(); err != nil { logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.ImageAuditTask.Id), err) if err = _DefaultImageAuditTask.OpUpdate(ctx, &imageaudittaskproto.OpUpdateReq{ ImageAuditTask: &dbstruct.ImageAuditTask{ Id: lastValidTask.ImageAuditTask.Id, Status: goproto.Int64(consts.ImageAudit_Failed), Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"), }, }); err != nil { logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err) } return } if err = _DefaultImageAuditTask.OpUpdate(ctx, &imageaudittaskproto.OpUpdateReq{ ImageAuditTask: &dbstruct.ImageAuditTask{ Id: lastValidTask.ImageAuditTask.Id, Status: goproto.Int64(consts.ImageAudit_Rollbacked), Remarks: goproto.String("任务审核失败,已将字段回退至之前的版本"), }, }); err != nil { logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err) } return } func updatePassedTasks(passTaskIds []string) (err error) { ctx := &gin.Context{} if err = _DefaultImageAuditTask.OpUpdateByIds(ctx, &imageaudittaskproto.OpUpdateByIdsReq{ ImageAuditTask: &dbstruct.ImageAuditTask{ Status: goproto.Int64(consts.ImageAudit_Passed), Remarks: goproto.String("任务审核通过"), }, Ids: passTaskIds, }); err != nil { logger.Error("_DefaultImageAudit OpUpdateByIds fail: %v\n", err) return } return } func updateExpiredTasks(expiredTaskIds []string) (err error) { ctx := &gin.Context{} if err = _DefaultImageAuditTask.OpUpdateByIds(ctx, &imageaudittaskproto.OpUpdateByIdsReq{ ImageAuditTask: &dbstruct.ImageAuditTask{ Status: goproto.Int64(consts.ImageAudit_Expired), Remarks: goproto.String("该任务审核未通过,但之前已有对同字段的审核任务失败,或之后有对同字段的审核任务成功,该任务已失效"), }, Ids: expiredTaskIds, }); err != nil { logger.Error("_DefaultImageAudit OpUpdateByIds fail: %v\n", err) return } return } func handleBatchError(batchId string, _err error) (err error) { logger.Info("All tasks of this batchId: %v has failed, rolling back...", batchId) ctx := &gin.Context{} if err = _DefaultImageAudit.OpUpdateByBatchId(ctx, batchId, &dbstruct.ImageAudit{ Status: goproto.Int64(consts.ImageAudit_ServiceFailed), Remarks: goproto.String("批次任务失败,原因详见task表"), }); err != nil { logger.Error("_DefaultImageAudit OpUpdateByBatchId fail: %v\n", err) return } if err = _DefaultImageAuditTask.OpUpdateByBatchId(ctx, batchId, &dbstruct.ImageAuditTask{ Status: goproto.Int64(consts.ImageAudit_ServiceFailed), Remarks: goproto.String(_err.Error()), }); err != nil { logger.Error("_DefaultImageAuditTask OpUpdateByBatchId fail: %v\n", err) return } return } // func createScanImageRequest(tasks []*ImageAuditTaskControlBlock, batchId string) (request *imageaudit.ScanImageRequest, // taskMap map[int]*ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction, err error) { // ctx := &gin.Context{} // // todo:taskMap其实可以删掉 // taskMap = make(map[int]*ImageAuditTaskControlBlock) // 图像审核索引号-task的map // actionMap = make(map[string]*ImageAuditAction) // 动作Id号-action的map // // 1.获取所有图像信息 // offset := 0 // images := make([]mediafiller.MediaFillable, 0) // for _, task := range tasks { // // 写map记录 // if actionMap[task.ActionId] == nil { // 写入actionMap // actionMap[task.ActionId] = NewImageAuditAction() // } // actionMap[task.ActionId].Record(task) // //获取图像 // for _, image := range task.Images { // images = append(images, image) // taskMap[offset] = task // offset++ // } // } // mediafiller.FillList(ctx, images) // // 2.打包图像url及图像审核id,转成ScanImageRequest // reqTasks := make([]*imageaudit.ScanImageRequestTask, 0) // for _, task := range tasks { // if util.DerefInt64(task.ImageAuditTask.IsFragmented) == 1 { // imageauditIds := util.DerefStringSlice(task.ImageAuditTask.ImageAuditFragmentIds) // for i := range task.Images { // reqTasks = append(reqTasks, &imageaudit.ScanImageRequestTask{ // DataId: goproto.String(imageauditIds[i]), // ImageURL: goproto.String(task.Images[i].Images[0].Urls[0]), // }) // } // } else { // reqTasks = append(reqTasks, &imageaudit.ScanImageRequestTask{ // DataId: task.ImageAuditTask.ImageAuditId, // ImageURL: goproto.String(task.Images[0].Images[0].Urls[0]), // }) // } // } // request = &imageaudit.ScanImageRequest{ // Scene: scenes, // Task: reqTasks, // } // logger.Info("本次打包:%v", reqTasks) // return // }