by Robin at 20231225; task service fail
This commit is contained in:
parent
d70664c98b
commit
476f711921
|
@ -38,28 +38,30 @@ const (
|
||||||
|
|
||||||
// 图像审核表状态
|
// 图像审核表状态
|
||||||
const (
|
const (
|
||||||
ImageAudit_Created = 0 //已创建
|
ImageAudit_Created = 0 //已创建
|
||||||
ImageAudit_Auditing = 1 //审核中
|
ImageAudit_Auditing = 1 //审核中
|
||||||
ImageAudit_Passed = 2 //已通过
|
ImageAudit_Passed = 2 //已通过
|
||||||
ImageAudit_Rejected = 3 //已拒绝
|
ImageAudit_Rejected = 3 //已拒绝
|
||||||
ImageAudit_Rollbacked = 4 //已回退
|
ImageAudit_Rollbacked = 4 //已回退
|
||||||
ImageAudit_ManuallyPassed = 5 //已通过
|
ImageAudit_ManuallyPassed = 5 //已人工通过
|
||||||
ImageAudit_ManuallyRejected = 6 //已拒绝
|
ImageAudit_ManuallyRejected = 6 //已人工拒绝
|
||||||
ImageAudit_ManuallyRollbacked = 7 //已回退
|
ImageAudit_ManuallyRollbacked = 7 //已人工回退
|
||||||
ImageAudit_Expired = 8 //已失效
|
ImageAudit_Expired = 8 //已失效
|
||||||
ImageAudit_Failed = 9 //已失败
|
ImageAudit_Failed = 9 //已回退失败
|
||||||
|
ImageAudit_ServiceFailed = 10 //批次任务失败
|
||||||
)
|
)
|
||||||
|
|
||||||
// 文字审核表状态
|
// 文字审核表状态
|
||||||
const (
|
const (
|
||||||
TextAudit_Created = 0 //已创建
|
TextAudit_Created = 0 //已创建
|
||||||
TextAudit_Auditing = 1 //审核中
|
TextAudit_Auditing = 1 //审核中
|
||||||
TextAudit_Passed = 2 //已通过
|
TextAudit_Passed = 2 //已通过
|
||||||
TextAudit_Rejected = 3 //已拒绝
|
TextAudit_Rejected = 3 //已拒绝
|
||||||
TextAudit_Rollbacked = 4 //已回退
|
TextAudit_Rollbacked = 4 //已回退
|
||||||
TextAudit_ManuallyPassed = 5 //已通过
|
TextAudit_ManuallyPassed = 5 //已人工通过
|
||||||
TextAudit_ManuallyRejected = 6 //已拒绝
|
TextAudit_ManuallyRejected = 6 //已人工拒绝
|
||||||
TextAudit_ManuallyRollbacked = 7 //已回退
|
TextAudit_ManuallyRollbacked = 7 //已人工回退
|
||||||
TextAudit_Expired = 8 //已失效
|
TextAudit_Expired = 8 //已失效
|
||||||
TextAudit_Failed = 9 //处理失败
|
TextAudit_Failed = 9 //回退失败
|
||||||
|
TextAudit_ServiceFailed = 10 //批次任务失败
|
||||||
)
|
)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package imageaudit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
"service/api/consts"
|
"service/api/consts"
|
||||||
imageauditproto "service/api/proto/imageaudit/proto"
|
imageauditproto "service/api/proto/imageaudit/proto"
|
||||||
imageaudittaskproto "service/api/proto/imageaudittask/proto"
|
imageaudittaskproto "service/api/proto/imageaudittask/proto"
|
||||||
|
@ -28,9 +29,12 @@ func executeImageAuditTasks(tasks []*ImageAuditTaskControlBlock, batchId string)
|
||||||
logger.Info("Image audit batch started, batchId : %v, task number : %v\n", batchId, len(tasks))
|
logger.Info("Image audit batch started, batchId : %v, task number : %v\n", batchId, len(tasks))
|
||||||
|
|
||||||
// 1.创建请求
|
// 1.创建请求
|
||||||
req, taskMap, actionMap, err := createScanImageRequest(tasks, batchId)
|
// oss不在上海的服务器,需要调用Advance接口
|
||||||
|
//req, taskMap, actionMap, err := createScanImageRequest(tasks, batchId)
|
||||||
|
req, taskMap, actionMap, err := createScanImageAdvanceRequest(tasks, batchId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Info("Create Scan ImageRequest fail: %v", err)
|
logger.Info("Create Scan ImageRequest fail: %v", err)
|
||||||
|
handleBatchError(batchId, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,9 +42,12 @@ func executeImageAuditTasks(tasks []*ImageAuditTaskControlBlock, batchId string)
|
||||||
runtime := &teautils.RuntimeOptions{
|
runtime := &teautils.RuntimeOptions{
|
||||||
ConnectTimeout: tea.Int(30000),
|
ConnectTimeout: tea.Int(30000),
|
||||||
}
|
}
|
||||||
_result, err := defaultImageAuditClient.ScanImageWithOptions(req, runtime)
|
_result, err := defaultImageAuditClient.ScanImageAdvance(req, runtime)
|
||||||
|
//_result, err := defaultImageAuditClient.ScanImageWithOptions(req, runtime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("ScanImageWithOptions fail : %v", err)
|
logger.Error("ScanImageAdvance fail : %v", err)
|
||||||
|
handleBatchError(batchId, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3.处理应答
|
// 3.处理应答
|
||||||
|
@ -50,10 +57,11 @@ func executeImageAuditTasks(tasks []*ImageAuditTaskControlBlock, batchId string)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func createScanImageRequest(tasks []*ImageAuditTaskControlBlock, batchId string) (request *imageaudit.ScanImageRequest,
|
func createScanImageAdvanceRequest(tasks []*ImageAuditTaskControlBlock, batchId string) (request *imageaudit.ScanImageAdvanceRequest,
|
||||||
taskMap map[int]*ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction, err error) {
|
taskMap map[int]*ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction, err error) {
|
||||||
|
|
||||||
ctx := &gin.Context{}
|
ctx := &gin.Context{}
|
||||||
|
httpClient := http.Client{}
|
||||||
// todo:taskMap其实可以删掉
|
// todo:taskMap其实可以删掉
|
||||||
taskMap = make(map[int]*ImageAuditTaskControlBlock) // 图像审核索引号-task的map
|
taskMap = make(map[int]*ImageAuditTaskControlBlock) // 图像审核索引号-task的map
|
||||||
actionMap = make(map[string]*ImageAuditAction) // 动作Id号-action的map
|
actionMap = make(map[string]*ImageAuditAction) // 动作Id号-action的map
|
||||||
|
@ -77,27 +85,39 @@ func createScanImageRequest(tasks []*ImageAuditTaskControlBlock, batchId string)
|
||||||
}
|
}
|
||||||
mediafiller.FillList(ctx, images)
|
mediafiller.FillList(ctx, images)
|
||||||
|
|
||||||
// 2.打包图像url及图像审核id,转成ScanImageRequest
|
// 2.打包图像url及图像审核id,转成ScanImageAdvanceRequest
|
||||||
reqTasks := make([]*imageaudit.ScanImageRequestTask, 0)
|
reqTasks := make([]*imageaudit.ScanImageAdvanceRequestTask, 0)
|
||||||
for _, task := range tasks {
|
for _, task := range tasks {
|
||||||
if util.DerefInt64(task.ImageAuditTask.IsFragmented) == 1 {
|
if util.DerefInt64(task.ImageAuditTask.IsFragmented) == 1 {
|
||||||
imageauditIds := util.DerefStringSlice(task.ImageAuditTask.ImageAuditFragmentIds)
|
imageauditIds := util.DerefStringSlice(task.ImageAuditTask.ImageAuditFragmentIds)
|
||||||
for i := range task.Images {
|
for i := range task.Images {
|
||||||
reqTasks = append(reqTasks, &imageaudit.ScanImageRequestTask{
|
var file *http.Response
|
||||||
DataId: goproto.String(imageauditIds[i]),
|
file, err = httpClient.Get(task.Images[i].Images[0].Urls[0])
|
||||||
ImageURL: goproto.String(task.Images[i].Images[0].Urls[0]),
|
if err != nil {
|
||||||
|
logger.Error("httpClient Get fail : %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
reqTasks = append(reqTasks, &imageaudit.ScanImageAdvanceRequestTask{
|
||||||
|
DataId: goproto.String(imageauditIds[i]),
|
||||||
|
ImageURLObject: file.Body,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
reqTasks = append(reqTasks, &imageaudit.ScanImageRequestTask{
|
var file *http.Response
|
||||||
DataId: task.ImageAuditTask.ImageAuditId,
|
file, err = httpClient.Get(task.Images[0].Images[0].Urls[0])
|
||||||
ImageURL: goproto.String(task.Images[0].Images[0].Urls[0]),
|
if err != nil {
|
||||||
|
logger.Error("httpClient Get fail : %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
reqTasks = append(reqTasks, &imageaudit.ScanImageAdvanceRequestTask{
|
||||||
|
DataId: task.ImageAuditTask.ImageAuditId,
|
||||||
|
ImageURLObject: file.Body,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
request = &imageaudit.ScanImageRequest{
|
request = &imageaudit.ScanImageAdvanceRequest{
|
||||||
Scene: scenes,
|
Scene: scenes,
|
||||||
Task: reqTasks,
|
Task: reqTasks,
|
||||||
}
|
}
|
||||||
|
@ -121,7 +141,7 @@ func handleScanImageResponse(resp *imageaudit.ScanImageResponse, batchId string,
|
||||||
logger.Error("handleImageAudit fail: %v", err)
|
logger.Error("handleImageAudit fail: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3.取dataId前19位为taskId,反查得到那一次task
|
// 3.取出task
|
||||||
task := taskMap[i]
|
task := taskMap[i]
|
||||||
|
|
||||||
// 4.在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功(非分片任务分片数为1)
|
// 4.在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功(非分片任务分片数为1)
|
||||||
|
@ -336,3 +356,79 @@ func updateExpiredTasks(expiredTaskIds []string) (err error) {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handleBatchError(batchId string, _err error) (err error) {
|
||||||
|
logger.Info("All tasks of this batchId: %v has failed, rolling back...", batchId)
|
||||||
|
ctx := &gin.Context{}
|
||||||
|
if err = _DefaultImageAudit.OpUpdateByBatchId(ctx, batchId, &dbstruct.ImageAudit{
|
||||||
|
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
|
||||||
|
Remarks: goproto.String("批次任务失败,原因详见task表"),
|
||||||
|
}); err != nil {
|
||||||
|
logger.Error("_DefaultImageAudit OpUpdateByBatchId fail: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = _DefaultImageAuditTask.OpUpdateByBatchId(ctx, batchId, &dbstruct.ImageAuditTask{
|
||||||
|
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
|
||||||
|
Remarks: goproto.String(_err.Error()),
|
||||||
|
}); err != nil {
|
||||||
|
logger.Error("_DefaultImageAuditTask OpUpdateByBatchId fail: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// func createScanImageRequest(tasks []*ImageAuditTaskControlBlock, batchId string) (request *imageaudit.ScanImageRequest,
|
||||||
|
// taskMap map[int]*ImageAuditTaskControlBlock, actionMap map[string]*ImageAuditAction, err error) {
|
||||||
|
|
||||||
|
// ctx := &gin.Context{}
|
||||||
|
// // todo:taskMap其实可以删掉
|
||||||
|
// taskMap = make(map[int]*ImageAuditTaskControlBlock) // 图像审核索引号-task的map
|
||||||
|
// actionMap = make(map[string]*ImageAuditAction) // 动作Id号-action的map
|
||||||
|
|
||||||
|
// // 1.获取所有图像信息
|
||||||
|
// offset := 0
|
||||||
|
// images := make([]mediafiller.MediaFillable, 0)
|
||||||
|
// for _, task := range tasks {
|
||||||
|
// // 写map记录
|
||||||
|
// if actionMap[task.ActionId] == nil { // 写入actionMap
|
||||||
|
// actionMap[task.ActionId] = NewImageAuditAction()
|
||||||
|
// }
|
||||||
|
// actionMap[task.ActionId].Record(task)
|
||||||
|
|
||||||
|
// //获取图像
|
||||||
|
// for _, image := range task.Images {
|
||||||
|
// images = append(images, image)
|
||||||
|
// taskMap[offset] = task
|
||||||
|
// offset++
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// mediafiller.FillList(ctx, images)
|
||||||
|
|
||||||
|
// // 2.打包图像url及图像审核id,转成ScanImageRequest
|
||||||
|
// reqTasks := make([]*imageaudit.ScanImageRequestTask, 0)
|
||||||
|
// for _, task := range tasks {
|
||||||
|
// if util.DerefInt64(task.ImageAuditTask.IsFragmented) == 1 {
|
||||||
|
// imageauditIds := util.DerefStringSlice(task.ImageAuditTask.ImageAuditFragmentIds)
|
||||||
|
// for i := range task.Images {
|
||||||
|
// reqTasks = append(reqTasks, &imageaudit.ScanImageRequestTask{
|
||||||
|
// DataId: goproto.String(imageauditIds[i]),
|
||||||
|
// ImageURL: goproto.String(task.Images[i].Images[0].Urls[0]),
|
||||||
|
// })
|
||||||
|
// }
|
||||||
|
// } else {
|
||||||
|
// reqTasks = append(reqTasks, &imageaudit.ScanImageRequestTask{
|
||||||
|
// DataId: task.ImageAuditTask.ImageAuditId,
|
||||||
|
// ImageURL: goproto.String(task.Images[0].Images[0].Urls[0]),
|
||||||
|
// })
|
||||||
|
// }
|
||||||
|
|
||||||
|
// }
|
||||||
|
|
||||||
|
// request = &imageaudit.ScanImageRequest{
|
||||||
|
// Scene: scenes,
|
||||||
|
// Task: reqTasks,
|
||||||
|
// }
|
||||||
|
// logger.Info("本次打包:%v", reqTasks)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
|
@ -29,6 +29,7 @@ func executeTextAuditTasks(tasks []*TextAuditTaskControlBlock, batchId string) (
|
||||||
req, actionMap, err := createScanTextRequest(tasks, batchId)
|
req, actionMap, err := createScanTextRequest(tasks, batchId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Info("Create Scan TextRequest fail: %v", err)
|
logger.Info("Create Scan TextRequest fail: %v", err)
|
||||||
|
handleBatchError(batchId, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,6 +40,8 @@ func executeTextAuditTasks(tasks []*TextAuditTaskControlBlock, batchId string) (
|
||||||
_result, err := defaultTextAuditClient.ScanTextWithOptions(req, runtime)
|
_result, err := defaultTextAuditClient.ScanTextWithOptions(req, runtime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("ScanTextWithOptions fail : %v", err)
|
logger.Error("ScanTextWithOptions fail : %v", err)
|
||||||
|
handleBatchError(batchId, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3.处理应答
|
// 3.处理应答
|
||||||
|
@ -292,3 +295,24 @@ func updateExpiredTasks(expiredTaskIds []string) (err error) {
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handleBatchError(batchId string, _err error) (err error) {
|
||||||
|
logger.Info("All tasks of this batchId: %v has failed, rolling back...", batchId)
|
||||||
|
ctx := &gin.Context{}
|
||||||
|
if err = _DefaultTextAudit.OpUpdateByBatchId(ctx, batchId, &dbstruct.TextAudit{
|
||||||
|
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
|
||||||
|
Remarks: goproto.String("批次任务失败,原因详见task表"),
|
||||||
|
}); err != nil {
|
||||||
|
logger.Error("_DefaultTextAudit OpUpdateByBatchId fail: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = _DefaultTextAuditTask.OpUpdateByBatchId(ctx, batchId, &dbstruct.TextAuditTask{
|
||||||
|
Status: goproto.Int64(consts.ImageAudit_ServiceFailed),
|
||||||
|
Remarks: goproto.String(_err.Error()),
|
||||||
|
}); err != nil {
|
||||||
|
logger.Error("_DefaultTextAuditTask OpUpdateByBatchId fail: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue