package textaudit import ( "fmt" "service/library/configcenter" "service/library/logger" "time" ) var defaultTextAuditTaskScheduler *TextAuditTaskScheduler // 文字审核任务调度器 type TextAuditTaskScheduler struct { // 缓冲池、同步标志 taskBuffer chan *TextAuditTaskControlBlock // 缓存待处理任务的缓冲池 batchFlag chan bool // 批处理同步标志 // 数据存储 taskPacket []*TextAuditTaskControlBlock // 一次批处理的任务包 // 状态记录 batchId string // 当前批次号 } func initScheduler(cfg *configcenter.TextAuditConfig) { defaultTextAuditTaskScheduler = &TextAuditTaskScheduler{ taskBuffer: make(chan *TextAuditTaskControlBlock, cfg.TaskBufferSize), batchFlag: make(chan bool, 1), taskPacket: make([]*TextAuditTaskControlBlock, 0), batchId: genereteBatchId(), } defaultTextAuditTaskScheduler.batchFlag <- true } // 将任务加入缓冲池 func (s *TextAuditTaskScheduler) AddTask(task *TextAuditTaskControlBlock) { s.taskBuffer <- task } // 通知进行批处理 func (s *TextAuditTaskScheduler) GiveNoticeToBatch() { s.taskBuffer <- &TextAuditTaskControlBlock{ IsGivingNoticeToBatch: true, } } // 批处理上锁 func (s *TextAuditTaskScheduler) lock() { <-s.batchFlag } // 批处理解锁 func (s *TextAuditTaskScheduler) unLock() { s.batchFlag <- true } // 调度方法 func (s *TextAuditTaskScheduler) Run() { go func() { // 1.循环取出缓冲池中的任务 // 2.如果任务标志进行批处理,则将任务包打包,进行审核作业,最后清空任务包,生成新的批次号 // 3.否则,将任务加入任务包中,并为任务建立索引,记录任务执行顺序 for { task := <-s.taskBuffer if task.IsGivingNoticeToBatch { // 上锁 s.lock() // 打包任务 packet := s.taskPacket s.taskPacket = make([]*TextAuditTaskControlBlock, 0) // 取出状态并重置 batchId := s.batchId s.batchId = genereteBatchId() // 执行审核作业 go func() { err := executeTextAuditTasks(packet, batchId) if err != nil { logger.Error("Batch failed : %v", err) } // 解锁 s.unLock() }() } else { s.taskPacket = append(s.taskPacket, task) } } }() } // 生成批次号 func genereteBatchId() string { now := time.Now() y, m, d := now.Date() h, mi, s := now.Clock() return fmt.Sprintf("%d%d%d%d%d%02d", y, m, d, h, mi, s) }