by Robin at 20240227;fix

This commit is contained in:
Leufolium 2024-02-27 01:05:14 +08:00
parent e570d4385d
commit c3db97077b
5 changed files with 12 additions and 43 deletions

View File

@ -114,7 +114,7 @@ server_info:
xxl_job: xxl_job:
server_addr: "http://172.16.0.174:9800/xxl-job-admin" server_addr: "http://172.16.0.174:9800/xxl-job-admin"
access_token: "default_token" access_token: "default_token"
executor_ip: "127.0.0.1" executor_ip: "172.16.0.177"
executor_port: "9801" executor_port: "9801"
registry_key: "golang-jobs-executor" registry_key: "golang-jobs-executor"
log_path: "/app/ironfan/log/xxl_job/" log_path: "/app/ironfan/log/xxl_job/"

View File

@ -42,9 +42,6 @@ func Run(batchId string) (successNum int, failNum int, err error) {
return 0, 0, nil return 0, 0, nil
} }
// 上锁
defaultImageAuditTaskScheduler.lock()
logger.Info("Image audit batch started, batchId : %v, task number : %v", batchId, len(imageaudittasks)) logger.Info("Image audit batch started, batchId : %v, task number : %v", batchId, len(imageaudittasks))
// 1.构建批量任务控制块初始化必要的actionMap、填充送审image等信息 // 1.构建批量任务控制块初始化必要的actionMap、填充送审image等信息
ctrlBlock := NewImageAuditTaskBatchControlBlock(imageaudittasks, batchId) ctrlBlock := NewImageAuditTaskBatchControlBlock(imageaudittasks, batchId)
@ -64,6 +61,7 @@ func Run(batchId string) (successNum int, failNum int, err error) {
ConnectTimeout: tea.Int(30000), ConnectTimeout: tea.Int(30000),
} }
_result, err := defaultImageAuditClient.ScanImageAdvance(req, runtime) _result, err := defaultImageAuditClient.ScanImageAdvance(req, runtime)
logger.Info("Receive the response from ScanImageAdvance: %v", _result.String())
if err != nil { if err != nil {
if _t, ok := err.(*tea.SDKError); ok { if _t, ok := err.(*tea.SDKError); ok {
logger.Error("ScanImageAdvance fail, errinfo : %v", util.DerefString(_t.Data)) logger.Error("ScanImageAdvance fail, errinfo : %v", util.DerefString(_t.Data))
@ -76,8 +74,6 @@ func Run(batchId string) (successNum int, failNum int, err error) {
// 4.处理应答 // 4.处理应答
err = handleScanImageResponse(ctrlBlock, _result) err = handleScanImageResponse(ctrlBlock, _result)
// 解锁
defaultImageAuditTaskScheduler.unLock()
successNum = len(imageaudittasks) successNum = len(imageaudittasks)
return return
} }
@ -119,37 +115,45 @@ func handleScanImageResponse(ctrlBlock *ImageAuditTaskBatchControlBlock, resp *i
img2taskIndexMap := ctrlBlock.Img2taskIndexMap img2taskIndexMap := ctrlBlock.Img2taskIndexMap
actionMap := ctrlBlock.ActionMap actionMap := ctrlBlock.ActionMap
for i, result := range results { for i, result := range results {
logger.Info("Handling %vs result...", i)
isTaskCompleted := false isTaskCompleted := false
isActionCompleted := false isActionCompleted := false
action := &ImageAuditAction{} action := &ImageAuditAction{}
// 1.dataId是imageaudit表主键Id, 唯一标识一次对单张图片的图像审核 // 1.dataId是imageaudit表主键Id, 唯一标识一次对单张图片的图像审核
logger.Info("Getting dataId...")
dataId := util.DerefString(result.DataId) dataId := util.DerefString(result.DataId)
// 2.立即在imageaudit-imageaudit中更新该次审核结果 // 2.立即在imageaudit-imageaudit中更新该次审核结果
logger.Info("Handling its audit record...")
pass, err := handleImageAudit(dataId, result) pass, err := handleImageAudit(dataId, result)
if err != nil { if err != nil {
logger.Error("handleImageAudit fail: %v", err) logger.Error("handleImageAudit fail: %v", err)
} }
// 3.取出task // 3.取出task
logger.Info("Retriving its audit task...")
taskIndex := img2taskIndexMap[i] taskIndex := img2taskIndexMap[i]
task := taskCtrlBlocks[taskIndex] task := taskCtrlBlocks[taskIndex]
// 4.在task中记录本分片结果并累积已到达的分片数直到所有分片到达决定该任务是否成功(非分片任务分片数为1) // 4.在task中记录本分片结果并累积已到达的分片数直到所有分片到达决定该任务是否成功(非分片任务分片数为1)
logger.Info("Recording it to task...")
isTaskCompleted = handleTask(task, pass) isTaskCompleted = handleTask(task, pass)
// 5.通过task的actionId去actionId-[]*task的map查出本批次对该字段的动作链更新其中关于自己的状态 // 5.通过task的actionId去actionId-[]*task的map查出本批次对该字段的动作链更新其中关于自己的状态
logger.Info("Recording the task result...")
if isTaskCompleted { if isTaskCompleted {
isActionCompleted, action = handleTaskAction(task, actionMap) isActionCompleted, action = handleTaskAction(task, actionMap)
} }
// 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 // 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功
logger.Info("Recording the action result...")
if isActionCompleted { if isActionCompleted {
if err = finalizeTask(action); err != nil { if err = finalizeTask(action); err != nil {
logger.Error("finalizeTask fail: %v", err) logger.Error("finalizeTask fail: %v", err)
} }
} }
logger.Info("%vs result handled...", i)
} }
return return
} }

View File

@ -10,29 +10,14 @@ var defaultImageAuditTaskScheduler *ImageAuditTaskScheduler
// 图像审核任务调度器 // 图像审核任务调度器
type ImageAuditTaskScheduler struct { type ImageAuditTaskScheduler struct {
// 同步标志
batchFlag chan bool // 批处理同步标志
// 状态记录 // 状态记录
batchId string // 当前批次号 batchId string // 当前批次号
} }
func initScheduler(cfg *configcenter.ImageAuditConfig) { func initScheduler(cfg *configcenter.ImageAuditConfig) {
defaultImageAuditTaskScheduler = &ImageAuditTaskScheduler{ defaultImageAuditTaskScheduler = &ImageAuditTaskScheduler{
batchFlag: make(chan bool, 1),
batchId: genereteBatchId(), batchId: genereteBatchId(),
} }
defaultImageAuditTaskScheduler.batchFlag <- true
}
// 批处理上锁
func (s *ImageAuditTaskScheduler) lock() {
<-s.batchFlag
}
// 批处理解锁
func (s *ImageAuditTaskScheduler) unLock() {
s.batchFlag <- true
} }
// 生成批次号 // 生成批次号

View File

@ -10,29 +10,14 @@ var defaultTextAuditTaskScheduler *TextAuditTaskScheduler
// 文字审核任务调度器 // 文字审核任务调度器
type TextAuditTaskScheduler struct { type TextAuditTaskScheduler struct {
// 缓冲池、同步标志
batchFlag chan bool // 批处理同步标志
// 状态记录 // 状态记录
batchId string // 当前批次号 batchId string // 当前批次号
} }
func initScheduler(cfg *configcenter.TextAuditConfig) { func initScheduler(cfg *configcenter.TextAuditConfig) {
defaultTextAuditTaskScheduler = &TextAuditTaskScheduler{ defaultTextAuditTaskScheduler = &TextAuditTaskScheduler{
batchFlag: make(chan bool, 1),
batchId: genereteBatchId(), batchId: genereteBatchId(),
} }
defaultTextAuditTaskScheduler.batchFlag <- true
}
// 批处理上锁
func (s *TextAuditTaskScheduler) lock() {
<-s.batchFlag
}
// 批处理解锁
func (s *TextAuditTaskScheduler) unLock() {
s.batchFlag <- true
} }
// 生成批次号 // 生成批次号

View File

@ -39,9 +39,6 @@ func Run(batchId string) (successNum int, failNum int, err error) {
return 0, 0, nil return 0, 0, nil
} }
// 上锁
defaultTextAuditTaskScheduler.lock()
logger.Info("Text audit batch started, batchId : %v, task number : %v", batchId, len(textaudittasks)) logger.Info("Text audit batch started, batchId : %v, task number : %v", batchId, len(textaudittasks))
// 1.构建批量任务控制块初始化必要的actionMap、填充送审text等信息 // 1.构建批量任务控制块初始化必要的actionMap、填充送审text等信息
ctrlBlock := NewTextAuditTaskBatchControlBlock(textaudittasks, batchId) ctrlBlock := NewTextAuditTaskBatchControlBlock(textaudittasks, batchId)
@ -68,8 +65,6 @@ func Run(batchId string) (successNum int, failNum int, err error) {
// 4.处理应答 // 4.处理应答
err = handleScanTextResponse(ctrlBlock, _result) err = handleScanTextResponse(ctrlBlock, _result)
// 解锁
defaultTextAuditTaskScheduler.unLock()
successNum = len(textaudittasks) successNum = len(textaudittasks)
return return
} }