420 lines
16 KiB
Go
420 lines
16 KiB
Go
package imageaudit
|
||
|
||
import (
|
||
"encoding/json"
|
||
"net/http"
|
||
"service/api/consts"
|
||
imageauditproto "service/api/proto/imageaudit/proto"
|
||
imageaudittaskproto "service/api/proto/imageaudittask/proto"
|
||
"service/bizcommon/util"
|
||
"service/dbstruct"
|
||
"service/library/logger"
|
||
|
||
imageaudit "github.com/alibabacloud-go/imageaudit-20191230/v3/client"
|
||
teautils "github.com/alibabacloud-go/tea-utils/v2/service"
|
||
"github.com/alibabacloud-go/tea/tea"
|
||
goproto "google.golang.org/protobuf/proto"
|
||
|
||
"github.com/gin-gonic/gin"
|
||
)
|
||
|
||
// 刷新批次号
|
||
func RefreshBatchId() string {
|
||
batchId := defaultImageAuditTaskScheduler.batchId
|
||
defaultImageAuditTaskScheduler.batchId = genereteBatchId()
|
||
return batchId
|
||
}
|
||
|
||
// 图像审核主逻辑
|
||
func Run(batchId string) (successNum int, failNum int, err error) {
|
||
|
||
// 查询该批次所有审核任务
|
||
imageaudittasks, err := _DefaultImageAuditTask.OpList(&gin.Context{}, &imageaudittaskproto.OpListReq{
|
||
BatchId: goproto.String(batchId),
|
||
Status: goproto.Int64(consts.ImageAudit_Created),
|
||
Sort: "ct",
|
||
})
|
||
if err != nil {
|
||
logger.Info("_DefaultImageAuditTask OpList fail: %v", err)
|
||
}
|
||
|
||
if len(imageaudittasks) == 0 {
|
||
return 0, 0, nil
|
||
}
|
||
|
||
logger.Info("Image audit batch started, batchId : %v, task number : %v", batchId, len(imageaudittasks))
|
||
// 1.构建批量任务控制块,初始化必要的actionMap、填充送审image等信息
|
||
ctrlBlock := NewImageAuditTaskBatchControlBlock(imageaudittasks, batchId)
|
||
|
||
// 2.创建请求
|
||
// oss不在上海的服务器,需要调用Advance接口
|
||
req, err := createScanImageAdvanceRequest(ctrlBlock)
|
||
if err != nil {
|
||
logger.Info("Create Scan ImageRequest fail: %v", err)
|
||
handleBatchError(ctrlBlock, err)
|
||
failNum = len(imageaudittasks)
|
||
return
|
||
}
|
||
|
||
// 3.调用阿里的服务,收到应答
|
||
runtime := &teautils.RuntimeOptions{
|
||
ConnectTimeout: tea.Int(30000),
|
||
}
|
||
_result, err := defaultImageAuditClient.ScanImageAdvance(req, runtime)
|
||
if err != nil {
|
||
if _t, ok := err.(*tea.SDKError); ok {
|
||
logger.Error("ScanImageAdvance fail, errinfo : %v", util.DerefString(_t.Data))
|
||
}
|
||
handleBatchError(ctrlBlock, err)
|
||
failNum = len(imageaudittasks)
|
||
return
|
||
}
|
||
logger.Info("Receive the response from ScanImageAdvance: %v", _result.String())
|
||
|
||
// 4.处理应答
|
||
err = handleScanImageResponse(ctrlBlock, _result)
|
||
|
||
successNum = len(imageaudittasks)
|
||
return
|
||
}
|
||
|
||
func createScanImageAdvanceRequest(ctrlBlock *ImageAuditTaskBatchControlBlock) (request *imageaudit.ScanImageAdvanceRequest, err error) {
|
||
httpClient := http.Client{}
|
||
imageauditIds := ctrlBlock.ImageAuditIds
|
||
reqTasks := make([]*imageaudit.ScanImageAdvanceRequestTask, 0)
|
||
reqs := make([]*imageaudit.ScanImageRequestTask, 0)
|
||
for i, image := range ctrlBlock.Images {
|
||
var file *http.Response
|
||
file, err = httpClient.Get(image.Images[0].Urls[0])
|
||
if err != nil {
|
||
logger.Error("httpClient Get fail : %v", err)
|
||
return
|
||
}
|
||
|
||
reqTasks = append(reqTasks, &imageaudit.ScanImageAdvanceRequestTask{
|
||
DataId: goproto.String(imageauditIds[i]),
|
||
ImageURLObject: file.Body,
|
||
})
|
||
reqs = append(reqs, &imageaudit.ScanImageRequestTask{
|
||
DataId: goproto.String(imageauditIds[i]),
|
||
ImageURL: goproto.String(image.Images[0].Urls[0]),
|
||
})
|
||
}
|
||
|
||
request = &imageaudit.ScanImageAdvanceRequest{
|
||
Scene: scenes,
|
||
Task: reqTasks,
|
||
}
|
||
logger.Info("本次打包:%v", reqs)
|
||
return
|
||
}
|
||
|
||
func handleScanImageResponse(ctrlBlock *ImageAuditTaskBatchControlBlock, resp *imageaudit.ScanImageResponse) (err error) {
|
||
results := resp.Body.Data.Results
|
||
taskCtrlBlocks := ctrlBlock.TaskCtrlBlocks
|
||
img2taskIndexMap := ctrlBlock.Img2taskIndexMap
|
||
actionMap := ctrlBlock.ActionMap
|
||
for i, result := range results {
|
||
logger.Info("Handling %vs result...", i)
|
||
isTaskCompleted := false
|
||
isActionCompleted := false
|
||
action := &ImageAuditAction{}
|
||
|
||
// 1.dataId是imageaudit表主键Id, 唯一标识一次对单张图片的图像审核
|
||
logger.Info("Getting dataId...")
|
||
dataId := util.DerefString(result.DataId)
|
||
|
||
// 2.立即在imageaudit-imageaudit中更新该次审核结果
|
||
logger.Info("Handling its audit record...")
|
||
pass, err := handleImageAudit(dataId, result)
|
||
if err != nil {
|
||
logger.Error("handleImageAudit fail: %v", err)
|
||
}
|
||
|
||
// 3.取出task
|
||
logger.Info("Retriving its audit task...")
|
||
taskIndex := img2taskIndexMap[i]
|
||
task := taskCtrlBlocks[taskIndex]
|
||
|
||
// 4.在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功(非分片任务分片数为1)
|
||
logger.Info("Recording it to task...")
|
||
isTaskCompleted = handleTask(task, pass)
|
||
|
||
// 5.通过task的actionId,去actionId-[]*task的map查出本批次对该字段的动作链,更新其中关于自己的状态
|
||
logger.Info("Recording the task result...")
|
||
if isTaskCompleted {
|
||
isActionCompleted, action = handleTaskAction(task, actionMap)
|
||
}
|
||
|
||
// 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
|
||
logger.Info("Recording the action result...")
|
||
if isActionCompleted {
|
||
if err = finalizeTask(action); err != nil {
|
||
logger.Error("finalizeTask fail: %v", err)
|
||
}
|
||
}
|
||
logger.Info("%vs result handled...", i)
|
||
}
|
||
return
|
||
}
|
||
|
||
func handleImageAudit(dataId string, result *imageaudit.ScanImageResponseBodyDataResults) (pass bool, err error) {
|
||
ctx := &gin.Context{}
|
||
imageaudit := &dbstruct.ImageAudit{
|
||
Id: goproto.String(dataId),
|
||
}
|
||
pass, err = copyScanResultInfo(imageaudit, result)
|
||
if err != nil {
|
||
logger.Error("Copy Scan Result Info fail: %v\n", err)
|
||
return
|
||
}
|
||
if err = _DefaultImageAudit.OpUpdate(ctx, &imageauditproto.OpUpdateReq{
|
||
ImageAudit: imageaudit,
|
||
}); err != nil {
|
||
logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err)
|
||
return
|
||
}
|
||
return
|
||
}
|
||
|
||
// 处理task,若task已分片,在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功
|
||
func handleTask(task *ImageAuditTaskControlBlock, pass bool) (isTaskCompleted bool) {
|
||
task.ImageAuditTask.AuditedMediaResults = append(task.ImageAuditTask.AuditedMediaResults, pass)
|
||
task.IsTaskPassed = task.IsTaskPassed && pass
|
||
task.AuditedFragmentsNum++
|
||
|
||
return task.AuditedFragmentsNum == int(util.DerefInt64(task.ImageAuditTask.FragmentsNum))
|
||
}
|
||
|
||
// 通过task的actionId查出action, 通知该task已完成
|
||
func handleTaskAction(task *ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction) (isActionCompleted bool, action *ImageAuditAction) {
|
||
action = actionMap[task.ActionId]
|
||
action.AuditedTaskNum++
|
||
|
||
isActionCompleted = action.TaskNum == action.AuditedTaskNum
|
||
return
|
||
|
||
}
|
||
|
||
// 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
|
||
func finalizeTask(action *ImageAuditAction) (err error) {
|
||
|
||
lastValidTask, lastValidTaskIndex := action.TaskChain[0], 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务和它的索引
|
||
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
|
||
expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId,失效任务是指那些前驱任务依然失败的失败任务,或之后已有任务成功的失败任务,这些任务已失去实际意义,不可以用来回退
|
||
statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效
|
||
isActionInPassedStatus := true // 当前动作链是否处于成功中
|
||
for i, task := range action.TaskChain {
|
||
if !task.IsTaskPassed { // 动作链检测到失败任务
|
||
if isActionInPassedStatus { // 若动作链处于成功状态,则这是目前为止,最后一个可以用于回退的任务
|
||
lastValidTask, lastValidTaskIndex = task, i
|
||
statuses[i] = consts.ImageAudit_Rollbacked
|
||
isActionInPassedStatus = false
|
||
} else { // 否则,若动作链处于失败状态,则该任务已失效
|
||
statuses[i] = consts.ImageAudit_Expired
|
||
expiredTaskIds = append(expiredTaskIds, util.DerefString(task.ImageAuditTask.Id))
|
||
}
|
||
} else { // 若动作链检测到成功任务,则重置动作链任务态,并将最后有效任务标志为失效(它已不再用于回退)
|
||
passTaskIds = append(passTaskIds, util.DerefString(task.ImageAuditTask.Id))
|
||
statuses[i] = consts.ImageAudit_Passed
|
||
isActionInPassedStatus = true
|
||
if statuses[lastValidTaskIndex] == consts.ImageAudit_Rollbacked {
|
||
statuses[lastValidTaskIndex] = consts.ImageAudit_Expired
|
||
expiredTaskIds = append(expiredTaskIds, util.DerefString(action.TaskChain[lastValidTaskIndex].ImageAuditTask.Id))
|
||
}
|
||
}
|
||
}
|
||
|
||
// 判定任务链终态,终态非成功,执行回退操作,记录到数据库中
|
||
if !action.IsPassed() {
|
||
if err = executeRollBack(lastValidTask.ImageAuditTask); err != nil {
|
||
return
|
||
}
|
||
}
|
||
|
||
// 更新所有任务状态
|
||
if err = updatePassedTasks(passTaskIds); err != nil {
|
||
return
|
||
}
|
||
if err = updateExpiredTasks(expiredTaskIds); err != nil {
|
||
return
|
||
}
|
||
|
||
// 终态成功,执行成功后操作
|
||
if action.IsPassed() {
|
||
if err = handleSuccess(action.TaskChain[len(action.TaskChain)-1].ImageAuditTask); err != nil {
|
||
return
|
||
}
|
||
}
|
||
|
||
logger.Info("action statuses : %v", statuses)
|
||
|
||
return
|
||
}
|
||
|
||
// 将图片审核结果copy到图片审核实体中
|
||
func copyScanResultInfo(imageaudit *dbstruct.ImageAudit, result *imageaudit.ScanImageResponseBodyDataResults) (pass bool, err error) {
|
||
|
||
pass = true
|
||
for _, subresult := range result.SubResults {
|
||
scene := ImageAuditScene(util.DerefString(subresult.Scene))
|
||
switch scene {
|
||
case Porn:
|
||
imageaudit.PornSceneSuggestion = subresult.Suggestion
|
||
imageaudit.PornSceneLabel = subresult.Label
|
||
imageaudit.PornSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate)))
|
||
pass = pass && (util.StringsContains(PornPassLabels, util.DerefString(subresult.Label)))
|
||
case Terrorism:
|
||
imageaudit.TerrorismSceneSuggestion = subresult.Suggestion
|
||
imageaudit.TerrorismSceneLabel = subresult.Label
|
||
imageaudit.TerrorismSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate)))
|
||
pass = pass && (util.StringsContains(TerrorismPassLabels, util.DerefString(subresult.Label)))
|
||
case Ad:
|
||
imageaudit.AdSceneSuggestion = subresult.Suggestion
|
||
imageaudit.AdSceneLabel = subresult.Label
|
||
imageaudit.AdSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate)))
|
||
pass = pass && (util.StringsContains(AdPassLabels, util.DerefString(subresult.Label)))
|
||
case Live:
|
||
imageaudit.LiveSceneSuggestion = subresult.Suggestion
|
||
imageaudit.LiveSceneLabel = subresult.Label
|
||
imageaudit.LiveSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate)))
|
||
pass = pass && (util.StringsContains(LivePassLabels, util.DerefString(subresult.Label)))
|
||
case Logo:
|
||
imageaudit.LogoSceneSuggestion = subresult.Suggestion
|
||
imageaudit.LogoSceneLabel = subresult.Label
|
||
imageaudit.LogoSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate)))
|
||
pass = pass && (util.StringsContains(LogoPassLabels, util.DerefString(subresult.Label)))
|
||
}
|
||
|
||
}
|
||
|
||
if pass {
|
||
imageaudit.Status = goproto.Int64(consts.ImageAudit_Passed)
|
||
} else {
|
||
imageaudit.Status = goproto.Int64(consts.ImageAudit_Rejected)
|
||
var bytes []byte
|
||
bytes, err = json.Marshal(result)
|
||
if err != nil {
|
||
return
|
||
}
|
||
imageaudit.Details = &bytes
|
||
}
|
||
return
|
||
}
|
||
|
||
func executeRollBack(lastValidTask *dbstruct.ImageAuditTask) (err error) {
|
||
ctx := &gin.Context{}
|
||
lastValidTask.Status = goproto.Int64(consts.ImageAudit_Rollbacked)
|
||
if err = _DefaultResultHandler.Handle(ctx, lastValidTask, consts.ImageAuditTaskUpdate_Rollback); err != nil {
|
||
logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.Id), err)
|
||
if err = _DefaultImageAuditTask.OpUpdate(ctx, &imageaudittaskproto.OpUpdateReq{
|
||
ImageAuditTask: &dbstruct.ImageAuditTask{
|
||
Id: lastValidTask.Id,
|
||
Status: goproto.Int64(consts.ImageAudit_Failed),
|
||
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
|
||
},
|
||
}); err != nil {
|
||
logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err)
|
||
}
|
||
return
|
||
}
|
||
if err = _DefaultImageAuditTask.OpUpdate(ctx, &imageaudittaskproto.OpUpdateReq{
|
||
ImageAuditTask: &dbstruct.ImageAuditTask{
|
||
Id: lastValidTask.Id,
|
||
Status: goproto.Int64(consts.ImageAudit_Rollbacked),
|
||
Remarks: goproto.String("任务审核失败,已将字段回退至之前的版本"),
|
||
},
|
||
}); err != nil {
|
||
logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err)
|
||
}
|
||
return
|
||
}
|
||
|
||
func handleSuccess(task *dbstruct.ImageAuditTask) (err error) {
|
||
ctx := &gin.Context{}
|
||
task.Status = goproto.Int64(consts.ImageAudit_Passed)
|
||
if err = _DefaultResultHandler.Handle(ctx, task, consts.ImageAuditTaskUpdate_Success); err != nil {
|
||
logger.Error("Handle success taskId:%v fail:%v", util.DerefString(task.Id), err)
|
||
if err = _DefaultImageAuditTask.OpUpdate(ctx, &imageaudittaskproto.OpUpdateReq{
|
||
ImageAuditTask: &dbstruct.ImageAuditTask{
|
||
Id: task.Id,
|
||
Status: goproto.Int64(consts.ImageAudit_Failed),
|
||
Remarks: goproto.String("任务审核成功,执行成功后操作失败,请联系管理员排查"),
|
||
},
|
||
}); err != nil {
|
||
logger.Error("_DefaultImageAuditTask OpUpdate fail: %v\n", err)
|
||
}
|
||
return
|
||
}
|
||
return
|
||
}
|
||
|
||
func updatePassedTasks(passTaskIds []string) (err error) {
|
||
ctx := &gin.Context{}
|
||
if err = _DefaultImageAuditTask.OpUpdateByIds(ctx, &imageaudittaskproto.OpUpdateByIdsReq{
|
||
ImageAuditTask: &dbstruct.ImageAuditTask{
|
||
Status: goproto.Int64(consts.ImageAudit_Passed),
|
||
Remarks: goproto.String("任务审核通过"),
|
||
},
|
||
Ids: passTaskIds,
|
||
}); err != nil {
|
||
logger.Error("_DefaultImageAudit OpUpdateByIds fail: %v\n", err)
|
||
return
|
||
}
|
||
return
|
||
}
|
||
|
||
func updateExpiredTasks(expiredTaskIds []string) (err error) {
|
||
ctx := &gin.Context{}
|
||
if err = _DefaultImageAuditTask.OpUpdateByIds(ctx, &imageaudittaskproto.OpUpdateByIdsReq{
|
||
ImageAuditTask: &dbstruct.ImageAuditTask{
|
||
Status: goproto.Int64(consts.ImageAudit_Expired),
|
||
Remarks: goproto.String("该任务审核未通过,但之前已有对同字段的审核任务失败,或之后有对同字段的审核任务成功,该任务已失效"),
|
||
},
|
||
Ids: expiredTaskIds,
|
||
}); err != nil {
|
||
logger.Error("_DefaultImageAudit OpUpdateByIds fail: %v\n", err)
|
||
return
|
||
}
|
||
return
|
||
}
|
||
|
||
func handleBatchError(ctrlBlock *ImageAuditTaskBatchControlBlock, _err error) (err error) {
|
||
logger.Info("All tasks of this batchId: %v has failed, rolling back...", ctrlBlock.BatchId)
|
||
ctx := &gin.Context{}
|
||
if err = _DefaultImageAudit.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.ImageAudit{
|
||
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
|
||
Remarks: goproto.String("批次任务失败,原因详见task表"),
|
||
}); err != nil {
|
||
logger.Error("_DefaultImageAudit OpUpdateByBatchId fail: %v\n", err)
|
||
return
|
||
}
|
||
|
||
if err = _DefaultImageAuditTask.OpUpdateByBatchId(ctx, ctrlBlock.BatchId, &dbstruct.ImageAuditTask{
|
||
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
|
||
Remarks: goproto.String(_err.Error()),
|
||
}); err != nil {
|
||
logger.Error("_DefaultImageAuditTask OpUpdateByBatchId fail: %v\n", err)
|
||
return
|
||
}
|
||
|
||
// 回退
|
||
for _, taskCtrlBlock := range ctrlBlock.TaskCtrlBlocks {
|
||
task := taskCtrlBlock.ImageAuditTask
|
||
task.Status = goproto.Int64(consts.ImageAudit_ServiceFailed)
|
||
if err = _DefaultResultHandler.Handle(ctx, task, consts.ImageAuditTaskUpdate_Rollback); err != nil {
|
||
if err = _DefaultImageAuditTask.OpUpdate(ctx, &imageaudittaskproto.OpUpdateReq{
|
||
ImageAuditTask: &dbstruct.ImageAuditTask{
|
||
Id: task.Id,
|
||
Status: goproto.Int64(consts.ImageAudit_Failed),
|
||
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
|
||
},
|
||
}); err != nil {
|
||
logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
return
|
||
}
|