diff --git a/etc/mix/mix-prod.yaml b/etc/mix/mix-prod.yaml index 10795151..3f6d1813 100644 --- a/etc/mix/mix-prod.yaml +++ b/etc/mix/mix-prod.yaml @@ -114,7 +114,7 @@ server_info: xxl_job: server_addr: "http://172.16.0.174:9800/xxl-job-admin" access_token: "default_token" - executor_ip: "127.0.0.1" + executor_ip: "172.16.0.177" executor_port: "9801" registry_key: "golang-jobs-executor" log_path: "/app/ironfan/log/xxl_job/" \ No newline at end of file diff --git a/library/contentaudit/imageaudit/imageaudit.go b/library/contentaudit/imageaudit/imageaudit.go index f350f95f..1bfa5caa 100644 --- a/library/contentaudit/imageaudit/imageaudit.go +++ b/library/contentaudit/imageaudit/imageaudit.go @@ -42,9 +42,6 @@ func Run(batchId string) (successNum int, failNum int, err error) { return 0, 0, nil } - // 上锁 - defaultImageAuditTaskScheduler.lock() - logger.Info("Image audit batch started, batchId : %v, task number : %v", batchId, len(imageaudittasks)) // 1.构建批量任务控制块,初始化必要的actionMap、填充送审image等信息 ctrlBlock := NewImageAuditTaskBatchControlBlock(imageaudittasks, batchId) @@ -64,6 +61,7 @@ func Run(batchId string) (successNum int, failNum int, err error) { ConnectTimeout: tea.Int(30000), } _result, err := defaultImageAuditClient.ScanImageAdvance(req, runtime) + logger.Info("Receive the response from ScanImageAdvance: %v", _result.String()) if err != nil { if _t, ok := err.(*tea.SDKError); ok { logger.Error("ScanImageAdvance fail, errinfo : %v", util.DerefString(_t.Data)) @@ -76,8 +74,6 @@ func Run(batchId string) (successNum int, failNum int, err error) { // 4.处理应答 err = handleScanImageResponse(ctrlBlock, _result) - // 解锁 - defaultImageAuditTaskScheduler.unLock() successNum = len(imageaudittasks) return } @@ -119,37 +115,45 @@ func handleScanImageResponse(ctrlBlock *ImageAuditTaskBatchControlBlock, resp *i img2taskIndexMap := ctrlBlock.Img2taskIndexMap actionMap := ctrlBlock.ActionMap for i, result := range results { + logger.Info("Handling %vs result...", i) isTaskCompleted := false isActionCompleted := false action := &ImageAuditAction{} // 1.dataId是imageaudit表主键Id, 唯一标识一次对单张图片的图像审核 + logger.Info("Getting dataId...") dataId := util.DerefString(result.DataId) // 2.立即在imageaudit-imageaudit中更新该次审核结果 + logger.Info("Handling its audit record...") pass, err := handleImageAudit(dataId, result) if err != nil { logger.Error("handleImageAudit fail: %v", err) } // 3.取出task + logger.Info("Retriving its audit task...") taskIndex := img2taskIndexMap[i] task := taskCtrlBlocks[taskIndex] // 4.在task中记录本分片结果,并累积已到达的分片数,直到所有分片到达,决定该任务是否成功(非分片任务分片数为1) + logger.Info("Recording it to task...") isTaskCompleted = handleTask(task, pass) // 5.通过task的actionId,去actionId-[]*task的map查出本批次对该字段的动作链,更新其中关于自己的状态 + logger.Info("Recording the task result...") if isTaskCompleted { isActionCompleted, action = handleTaskAction(task, actionMap) } // 6.等待所有动作链元素均已抵达,判定本批次对该字段的操作是否成功 + logger.Info("Recording the action result...") if isActionCompleted { if err = finalizeTask(action); err != nil { logger.Error("finalizeTask fail: %v", err) } } + logger.Info("%vs result handled...", i) } return } diff --git a/library/contentaudit/imageaudit/taskscheduler.go b/library/contentaudit/imageaudit/taskscheduler.go index b88320cf..7ee484ee 100644 --- a/library/contentaudit/imageaudit/taskscheduler.go +++ b/library/contentaudit/imageaudit/taskscheduler.go @@ -10,29 +10,14 @@ var defaultImageAuditTaskScheduler *ImageAuditTaskScheduler // 图像审核任务调度器 type ImageAuditTaskScheduler struct { - // 同步标志 - batchFlag chan bool // 批处理同步标志 - // 状态记录 batchId string // 当前批次号 } func initScheduler(cfg *configcenter.ImageAuditConfig) { defaultImageAuditTaskScheduler = &ImageAuditTaskScheduler{ - batchFlag: make(chan bool, 1), - batchId: genereteBatchId(), + batchId: genereteBatchId(), } - defaultImageAuditTaskScheduler.batchFlag <- true -} - -// 批处理上锁 -func (s *ImageAuditTaskScheduler) lock() { - <-s.batchFlag -} - -// 批处理解锁 -func (s *ImageAuditTaskScheduler) unLock() { - s.batchFlag <- true } // 生成批次号 diff --git a/library/contentaudit/textaudit/taskscheduler.go b/library/contentaudit/textaudit/taskscheduler.go index 1902b3d7..eb7c92e1 100644 --- a/library/contentaudit/textaudit/taskscheduler.go +++ b/library/contentaudit/textaudit/taskscheduler.go @@ -10,29 +10,14 @@ var defaultTextAuditTaskScheduler *TextAuditTaskScheduler // 文字审核任务调度器 type TextAuditTaskScheduler struct { - // 缓冲池、同步标志 - batchFlag chan bool // 批处理同步标志 - // 状态记录 batchId string // 当前批次号 } func initScheduler(cfg *configcenter.TextAuditConfig) { defaultTextAuditTaskScheduler = &TextAuditTaskScheduler{ - batchFlag: make(chan bool, 1), - batchId: genereteBatchId(), + batchId: genereteBatchId(), } - defaultTextAuditTaskScheduler.batchFlag <- true -} - -// 批处理上锁 -func (s *TextAuditTaskScheduler) lock() { - <-s.batchFlag -} - -// 批处理解锁 -func (s *TextAuditTaskScheduler) unLock() { - s.batchFlag <- true } // 生成批次号 diff --git a/library/contentaudit/textaudit/textaudit.go b/library/contentaudit/textaudit/textaudit.go index 8653b016..3f55a23c 100644 --- a/library/contentaudit/textaudit/textaudit.go +++ b/library/contentaudit/textaudit/textaudit.go @@ -39,9 +39,6 @@ func Run(batchId string) (successNum int, failNum int, err error) { return 0, 0, nil } - // 上锁 - defaultTextAuditTaskScheduler.lock() - logger.Info("Text audit batch started, batchId : %v, task number : %v", batchId, len(textaudittasks)) // 1.构建批量任务控制块,初始化必要的actionMap、填充送审text等信息 ctrlBlock := NewTextAuditTaskBatchControlBlock(textaudittasks, batchId) @@ -68,8 +65,6 @@ func Run(batchId string) (successNum int, failNum int, err error) { // 4.处理应答 err = handleScanTextResponse(ctrlBlock, _result) - // 解锁 - defaultTextAuditTaskScheduler.unLock() successNum = len(textaudittasks) return }