service/library/contentaudit/imageaudit/imageaudit.go

435 lines
16 KiB
Go
Raw Normal View History

2023-12-21 22:17:40 +08:00
package imageaudit
import (
"encoding/json"
"net/http"
2023-12-21 22:17:40 +08:00
"service/api/consts"
imageauditproto "service/api/proto/imageaudit/proto"
imageaudittaskproto "service/api/proto/imageaudittask/proto"
"service/bizcommon/util"
"service/dbstruct"
"service/library/logger"
"service/library/mediafiller"
imageaudit "github.com/alibabacloud-go/imageaudit-20191230/v3/client"
teautils "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
goproto "google.golang.org/protobuf/proto"
"github.com/gin-gonic/gin"
)
2024-01-18 18:19:31 +08:00
// 图像审核主逻辑
func Run() (err error) {
// 更新batchId
batchId := defaultImageAuditTaskScheduler.batchId
defaultImageAuditTaskScheduler.batchId = genereteBatchId()
// 查询该批次所有审核任务
imageaudittasks, err := _DefaultImageAuditTask.OpList(&gin.Context{}, &imageaudittaskproto.OpListReq{
BatchId: goproto.String(batchId),
Status: goproto.Int64(consts.ImageAudit_Created),
2024-01-19 17:31:38 +08:00
Sort: "ct",
2024-01-18 18:19:31 +08:00
})
2024-01-19 17:31:38 +08:00
// 上锁
defaultImageAuditTaskScheduler.lock()
createImageAuditTaskControlBlocks(tasks)
// 解锁
defaultImageAuditTaskScheduler.unLock()
2024-01-18 18:19:31 +08:00
}
2023-12-21 22:17:40 +08:00
// 图像审核主逻辑
func executeImageAuditTasks(tasks []*ImageAuditTaskControlBlock, batchId string) (err error) {
if len(tasks) == 0 {
return
}
logger.Info("Image audit batch started, batchId : %v, task number : %v\n", batchId, len(tasks))
// 1.创建请求
// oss不在上海的服务器需要调用Advance接口
//req, taskMap, actionMap, err := createScanImageRequest(tasks, batchId)
req, taskMap, actionMap, err := createScanImageAdvanceRequest(tasks, batchId)
2023-12-21 22:17:40 +08:00
if err != nil {
logger.Info("Create Scan ImageRequest fail: %v", err)
2023-12-28 22:46:14 +08:00
handleBatchError(tasks, batchId, err)
2023-12-21 22:17:40 +08:00
return
}
// 2.调用阿里的服务,收到应答
runtime := &teautils.RuntimeOptions{
ConnectTimeout: tea.Int(30000),
}
_result, err := defaultImageAuditClient.ScanImageAdvance(req, runtime)
//_result, err := defaultImageAuditClient.ScanImageWithOptions(req, runtime)
2023-12-21 22:17:40 +08:00
if err != nil {
2024-01-05 00:23:59 +08:00
if _t, ok := err.(*tea.SDKError); ok {
logger.Error("ScanImageAdvance fail, errinfo : %v", util.DerefString(_t.Data))
}
2023-12-28 22:46:14 +08:00
handleBatchError(tasks, batchId, err)
return
2023-12-21 22:17:40 +08:00
}
// 3.处理应答
err = handleScanImageResponse(_result, batchId, taskMap, actionMap)
return
}
func createScanImageAdvanceRequest(tasks []*ImageAuditTaskControlBlock, batchId string) (request *imageaudit.ScanImageAdvanceRequest,
2023-12-21 22:17:40 +08:00
taskMap map[int]*ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction, err error) {
ctx := &gin.Context{}
httpClient := http.Client{}
2023-12-21 22:17:40 +08:00
// todo:taskMap其实可以删掉
taskMap = make(map[int]*ImageAuditTaskControlBlock) // 图像审核索引号-task的map
actionMap = make(map[string]*ImageAuditAction) // 动作Id号-action的map
// 1.获取所有图像信息
offset := 0
images := make([]mediafiller.MediaFillable, 0)
for _, task := range tasks {
// 写map记录
if actionMap[task.ActionId] == nil { // 写入actionMap
actionMap[task.ActionId] = NewImageAuditAction()
2024-01-04 21:55:00 +08:00
// 将此前批次对该元素未成功,待处理的审核全部置为已失效
if err := _DefaultImageAuditTask.OpHandleOverdue(ctx, task.ImageAuditTask, batchId); err != nil {
logger.Error("_DefaultImageAuditTask OpHandleOverdue fail :%v", err)
}
2023-12-21 22:17:40 +08:00
}
actionMap[task.ActionId].Record(task)
//获取图像
for _, image := range task.Images {
images = append(images, image)
taskMap[offset] = task
offset++
}
}
mediafiller.FillList(ctx, images)
// 2.打包图像url及图像审核id转成ScanImageAdvanceRequest
reqTasks := make([]*imageaudit.ScanImageAdvanceRequestTask, 0)
2024-01-01 19:24:32 +08:00
reqs := make([]*imageaudit.ScanImageRequestTask, 0)
2023-12-21 22:17:40 +08:00
for _, task := range tasks {
if util.DerefInt64(task.ImageAuditTask.IsFragmented) == 1 {
imageauditIds := util.DerefStringSlice(task.ImageAuditTask.ImageAuditFragmentIds)
for i := range task.Images {
var file *http.Response
file, err = httpClient.Get(task.Images[i].Images[0].Urls[0])
if err != nil {
logger.Error("httpClient Get fail : %v", err)
return
}
reqTasks = append(reqTasks, &imageaudit.ScanImageAdvanceRequestTask{
DataId: goproto.String(imageauditIds[i]),
ImageURLObject: file.Body,
2023-12-21 22:17:40 +08:00
})
2024-01-01 19:24:32 +08:00
reqs = append(reqs, &imageaudit.ScanImageRequestTask{
DataId: goproto.String(imageauditIds[i]),
ImageURL: goproto.String(task.Images[i].Images[0].Urls[0]),
})
2023-12-21 22:17:40 +08:00
}
} else {
var file *http.Response
file, err = httpClient.Get(task.Images[0].Images[0].Urls[0])
if err != nil {
logger.Error("httpClient Get fail : %v", err)
return
}
reqTasks = append(reqTasks, &imageaudit.ScanImageAdvanceRequestTask{
DataId: task.ImageAuditTask.ImageAuditId,
ImageURLObject: file.Body,
2023-12-21 22:17:40 +08:00
})
2024-01-01 19:24:32 +08:00
reqs = append(reqs, &imageaudit.ScanImageRequestTask{
DataId: task.ImageAuditTask.ImageAuditId,
ImageURL: goproto.String(task.Images[0].Images[0].Urls[0]),
})
2023-12-21 22:17:40 +08:00
}
}
request = &imageaudit.ScanImageAdvanceRequest{
2023-12-21 22:17:40 +08:00
Scene: scenes,
Task: reqTasks,
}
2024-01-01 19:24:32 +08:00
logger.Info("本次打包:%v", reqs)
2023-12-21 22:17:40 +08:00
return
}
func handleScanImageResponse(resp *imageaudit.ScanImageResponse, batchId string, taskMap map[int]*ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction) (err error) {
results := resp.Body.Data.Results
for i, result := range results {
isTaskCompleted := false
isActionCompleted := false
action := &ImageAuditAction{}
// 1.dataId是imageaudit表主键Id, 唯一标识一次对单张图片的图像审核
dataId := util.DerefString(result.DataId)
// 2.立即在imageaudit-imageaudit中更新该次审核结果
pass, err := handleImageAudit(dataId, result)
if err != nil {
logger.Error("handleImageAudit fail: %v", err)
}
// 3.取出task
2023-12-21 22:17:40 +08:00
task := taskMap[i]
// 4.在task中记录本分片结果并累积已到达的分片数直到所有分片到达决定该任务是否成功(非分片任务分片数为1)
isTaskCompleted = handleTask(task, pass)
// 5.通过task的actionId去actionId-[]*task的map查出本批次对该字段的动作链更新其中关于自己的状态
if isTaskCompleted {
isActionCompleted, action = handleTaskAction(task, actionMap)
}
// 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
if isActionCompleted {
if err = finalizeTask(action); err != nil {
logger.Error("finalizeTask fail: %v", err)
}
}
}
return
}
func handleImageAudit(dataId string, result *imageaudit.ScanImageResponseBodyDataResults) (pass bool, err error) {
ctx := &gin.Context{}
imageaudit := &dbstruct.ImageAudit{
Id: goproto.String(dataId),
}
pass, err = copyScanResultInfo(imageaudit, result)
if err != nil {
logger.Error("Copy Scan Result Info fail: %v\n", err)
return
}
if err = _DefaultImageAudit.OpUpdate(ctx, &imageauditproto.OpUpdateReq{
ImageAudit: imageaudit,
}); err != nil {
logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err)
return
}
return
}
// 处理task若task已分片在task中记录本分片结果并累积已到达的分片数直到所有分片到达决定该任务是否成功
func handleTask(task *ImageAuditTaskControlBlock, pass bool) (isTaskCompleted bool) {
task.IsTaskPassed = task.IsTaskPassed && pass
task.AuditedFragmentsNum++
return task.AuditedFragmentsNum == int(util.DerefInt64(task.ImageAuditTask.FragmentsNum))
}
// 通过task的actionId查出action, 通知该task已完成
func handleTaskAction(task *ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction) (isActionCompleted bool, action *ImageAuditAction) {
action = actionMap[task.ActionId]
action.AuditedTaskNum++
isActionCompleted = action.TaskNum == action.AuditedTaskNum
return
}
// 等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
func finalizeTask(action *ImageAuditAction) (err error) {
lastValidTask := action.TaskChain[0] // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务
lastValidTaskIndex := 0 // 仅在动作链终态不成功时有意义,表示最后一个可以用以回退的失败任务的索引
passTaskIds := make([]string, 0) // 所有已审核成功的任务的taskId
expiredTaskIds := make([]string, 0) // 所有已失效任务的taskId失效任务是指那些前驱任务依然失败的失败任务这些任务已失去实际意义不可以用来回退
statuses := make([]int64, len(action.TaskChain)) // 所有任务的终态状态,成功,回退,或失效
isActionInPassedStatus := true // 当前动作链是否处于成功中
for i, task := range action.TaskChain {
if !task.IsTaskPassed { // 动作链检测到失败任务
if isActionInPassedStatus { // 若动作链处于成功状态,则这是目前为止,最后一个可以用于回退的任务
lastValidTask = task
lastValidTaskIndex = i
statuses[i] = consts.ImageAudit_Rollbacked
isActionInPassedStatus = false
} else { // 否则,若动作链处于失败状态,则该任务已失效
statuses[i] = consts.ImageAudit_Expired
expiredTaskIds = append(expiredTaskIds, util.DerefString(task.ImageAuditTask.Id))
}
} else { // 若动作链检测到成功任务,则重置动作链任务态,并将最后有效任务标志为失效(它已不再用于回退)
passTaskIds = append(passTaskIds, util.DerefString(task.ImageAuditTask.Id))
statuses[i] = consts.ImageAudit_Passed
isActionInPassedStatus = true
if statuses[lastValidTaskIndex] == consts.ImageAudit_Rollbacked {
statuses[lastValidTaskIndex] = consts.ImageAudit_Expired
expiredTaskIds = append(expiredTaskIds, util.DerefString(action.TaskChain[lastValidTaskIndex].ImageAuditTask.Id))
}
}
}
// 判定任务链终态,终态非成功,执行回退操作,记录到数据库中
if !action.IsPassed() {
if err = executeRollBack(lastValidTask); err != nil {
return
}
}
// 更新所有任务状态
if err = updatePassedTasks(passTaskIds); err != nil {
return
}
if err = updateExpiredTasks(expiredTaskIds); err != nil {
return
}
logger.Info("action statuses : %v", statuses)
return
}
// 将图片审核结果copy到图片审核实体中
func copyScanResultInfo(imageaudit *dbstruct.ImageAudit, result *imageaudit.ScanImageResponseBodyDataResults) (pass bool, err error) {
pass = true
for _, subresult := range result.SubResults {
scene := ImageAuditScene(util.DerefString(subresult.Scene))
switch scene {
case Porn:
imageaudit.PornSceneSuggestion = subresult.Suggestion
imageaudit.PornSceneLabel = subresult.Label
imageaudit.PornSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate)))
pass = pass && (util.StringsContains(PornPassLabels, util.DerefString(subresult.Label)))
case Terrorism:
imageaudit.TerrorismSceneSuggestion = subresult.Suggestion
imageaudit.TerrorismSceneLabel = subresult.Label
imageaudit.TerrorismSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate)))
pass = pass && (util.StringsContains(TerrorismPassLabels, util.DerefString(subresult.Label)))
case Ad:
imageaudit.AdSceneSuggestion = subresult.Suggestion
imageaudit.AdSceneLabel = subresult.Label
imageaudit.AdSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate)))
pass = pass && (util.StringsContains(AdPassLabels, util.DerefString(subresult.Label)))
case Live:
imageaudit.LiveSceneSuggestion = subresult.Suggestion
imageaudit.LiveSceneLabel = subresult.Label
imageaudit.LiveSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate)))
pass = pass && (util.StringsContains(LivePassLabels, util.DerefString(subresult.Label)))
case Logo:
imageaudit.LogoSceneSuggestion = subresult.Suggestion
imageaudit.LogoSceneLabel = subresult.Label
imageaudit.LogoSceneRate = goproto.Float64(float64(util.DerefFloat32(subresult.Rate)))
pass = pass && (util.StringsContains(LogoPassLabels, util.DerefString(subresult.Label)))
}
}
if pass {
imageaudit.Status = goproto.Int64(consts.ImageAudit_Passed)
} else {
imageaudit.Status = goproto.Int64(consts.ImageAudit_Rejected)
var bytes []byte
bytes, err = json.Marshal(result)
if err != nil {
return
}
imageaudit.Details = &bytes
}
return
}
func executeRollBack(lastValidTask *ImageAuditTaskControlBlock) (err error) {
ctx := &gin.Context{}
if err = lastValidTask.RollbackFunc(); err != nil {
logger.Error("Roll back taskId:%v fail:%v", util.DerefString(lastValidTask.ImageAuditTask.Id), err)
if err = _DefaultImageAuditTask.OpUpdate(ctx, &imageaudittaskproto.OpUpdateReq{
ImageAuditTask: &dbstruct.ImageAuditTask{
Id: lastValidTask.ImageAuditTask.Id,
Status: goproto.Int64(consts.ImageAudit_Failed),
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
},
}); err != nil {
logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err)
}
return
}
if err = _DefaultImageAuditTask.OpUpdate(ctx, &imageaudittaskproto.OpUpdateReq{
ImageAuditTask: &dbstruct.ImageAuditTask{
Id: lastValidTask.ImageAuditTask.Id,
Status: goproto.Int64(consts.ImageAudit_Rollbacked),
Remarks: goproto.String("任务审核失败,已将字段回退至之前的版本"),
},
}); err != nil {
logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err)
}
return
}
func updatePassedTasks(passTaskIds []string) (err error) {
ctx := &gin.Context{}
if err = _DefaultImageAuditTask.OpUpdateByIds(ctx, &imageaudittaskproto.OpUpdateByIdsReq{
ImageAuditTask: &dbstruct.ImageAuditTask{
Status: goproto.Int64(consts.ImageAudit_Passed),
Remarks: goproto.String("任务审核通过"),
},
Ids: passTaskIds,
}); err != nil {
logger.Error("_DefaultImageAudit OpUpdateByIds fail: %v\n", err)
return
}
return
}
func updateExpiredTasks(expiredTaskIds []string) (err error) {
ctx := &gin.Context{}
if err = _DefaultImageAuditTask.OpUpdateByIds(ctx, &imageaudittaskproto.OpUpdateByIdsReq{
ImageAuditTask: &dbstruct.ImageAuditTask{
Status: goproto.Int64(consts.ImageAudit_Expired),
Remarks: goproto.String("该任务审核未通过,但之前已有对同字段的审核任务失败,或之后有对同字段的审核任务成功,该任务已失效"),
},
Ids: expiredTaskIds,
}); err != nil {
logger.Error("_DefaultImageAudit OpUpdateByIds fail: %v\n", err)
return
}
return
}
2023-12-28 22:46:14 +08:00
func handleBatchError(tasks []*ImageAuditTaskControlBlock, batchId string, _err error) (err error) {
logger.Info("All tasks of this batchId: %v has failed, rolling back...", batchId)
ctx := &gin.Context{}
if err = _DefaultImageAudit.OpUpdateByBatchId(ctx, batchId, &dbstruct.ImageAudit{
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
Remarks: goproto.String("批次任务失败原因详见task表"),
}); err != nil {
logger.Error("_DefaultImageAudit OpUpdateByBatchId fail: %v\n", err)
return
}
if err = _DefaultImageAuditTask.OpUpdateByBatchId(ctx, batchId, &dbstruct.ImageAuditTask{
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
Remarks: goproto.String(_err.Error()),
}); err != nil {
logger.Error("_DefaultImageAuditTask OpUpdateByBatchId fail: %v\n", err)
return
}
2023-12-28 22:46:14 +08:00
// 回退
for _, task := range tasks {
if err = task.RollbackFunc(); err != nil {
if err = _DefaultImageAuditTask.OpUpdate(ctx, &imageaudittaskproto.OpUpdateReq{
ImageAuditTask: &dbstruct.ImageAuditTask{
Id: task.ImageAuditTask.Id,
Status: goproto.Int64(consts.ImageAudit_Failed),
Remarks: goproto.String("任务审核失败,回退失败,请联系管理员排查"),
},
}); err != nil {
logger.Error("_DefaultImageAudit OpUpdate fail: %v\n", err)
}
}
}
return
}