diff --git a/app/mix/cmd/main.go b/app/mix/cmd/main.go index caeaa0d7..aabad84a 100644 --- a/app/mix/cmd/main.go +++ b/app/mix/cmd/main.go @@ -75,7 +75,11 @@ func main() { msg := fmt.Sprintf("Service init fail, err: %v", err) PrintAndExit(msg) } - service.DefaultCronService.ReadLoggerConfig(cfg.Log) + err = service.DefaultCronService.Init(cfg) + if err != nil { + msg := fmt.Sprintf("CronService init fail, err: %v", err) + PrintAndExit(msg) + } // 连接到图像审核任务 service.DefaultService.ConnectToImageAudit() @@ -90,9 +94,6 @@ func main() { // 初始化媒体填充服务 mediafiller.Init(cfg.ServerInfo) - // 启动定时任务 - service.DefaultCronService.Run() - // 初始化http server router := httpengine.NewRouter() middleware.InitJwtAuthenticator(service.DefaultService.OpVerifyToken) diff --git a/app/mix/conf/cfg.go b/app/mix/conf/cfg.go index 3945c817..95fe3e91 100644 --- a/app/mix/conf/cfg.go +++ b/app/mix/conf/cfg.go @@ -16,4 +16,5 @@ type ConfigSt struct { RedisConfig *configcenter.RedisConfig `json:"redis" yaml:"redis"` // redis ServerInfo *configcenter.ServerInfoConfig `json:"server_info" yaml:"server_info"` // 服务器信息 TextAudit *configcenter.TextAuditConfig `json:"textaudit" yaml:"textaudit"` // 文字内容审核服务 + XxlJob *configcenter.XxlJobConfig `json:"xxl_job" yaml:"xxl_job"` // xxl-job作业系统 } diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 6b95b20f..dd599c8f 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -331,8 +331,8 @@ func (s *Service) ApiUpdateAccount(ctx *gin.Context, req *accountproto.ApiUpdate } // 创建审核任务 - imageaudittask := s.CreateUpdateAccountImageAudit(ctx, oldAccount, req.Account) - textaudittask := s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account) + imageaudittasks := s.CreateUpdateAccountImageAudit(ctx, oldAccount, req.Account) + textaudittasks := s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account) // 用户只允许修改昵称和头像 if err := _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{ @@ -348,9 +348,7 @@ func (s *Service) ApiUpdateAccount(ctx *gin.Context, req *accountproto.ApiUpdate } // 审核任务加入队列 - if imageaudittask != nil { - imageaudit.AddTask(imageaudittask) - } + imageaudit.AddTasks(imageaudittasks) if textaudittask != nil { textaudit.AddTask(textaudittask) } diff --git a/app/mix/service/cronservice.go b/app/mix/service/cronservice.go index 83a7116b..cb97b7b8 100644 --- a/app/mix/service/cronservice.go +++ b/app/mix/service/cronservice.go @@ -2,26 +2,11 @@ package service import ( "context" - "fmt" - "service/api/consts" - "service/api/errcode" - accountproto "service/api/proto/account/proto" - daily_statementproto "service/api/proto/daily_statement/proto" - streamerproto "service/api/proto/streamer/proto" - vasproto "service/api/proto/vas/proto" - "service/bizcommon/util" - "service/dbstruct" - "service/library/configcenter" - "service/library/contentaudit/imageaudit" - "service/library/contentaudit/textaudit" + "errors" + "service/app/mix/conf" "service/library/logger" - "service/library/redis" - "time" - "github.com/gin-gonic/gin" - "github.com/go-co-op/gocron" xxl "github.com/xxl-job/xxl-job-executor-go" - goproto "google.golang.org/protobuf/proto" ) // 20240117: 接入xxl-job定时任务作业系统 @@ -35,24 +20,41 @@ type CronService struct { } func NewCronService() *CronService { + return new(CronService) +} + +func (s *CronService) Init(c any) (err error) { + cfg, ok := c.(*conf.ConfigSt) + if !ok { + err = errors.New("cfg struct type not expected") + logger.Error("service init, err: %v", err) + return + } + + s.fileAbsPath = cfg.Log.FileAbsPath + exec := xxl.NewExecutor( - xxl.ServerAddr("http://127.0.0.1:9800/xxl-job-admin"), - xxl.AccessToken("default_token"), //请求令牌(默认为空) - xxl.ExecutorIp("127.0.0.1"), //可自动获取 - xxl.ExecutorPort("9801"), //默认9999(非必填) - xxl.RegistryKey("golang-jobs"), //执行器名称 - xxl.SetLogger(&XxlLogger{}), //自定义日志 + xxl.ServerAddr(cfg.XxlJob.ServerAddr), //xxl-job-admin部署地址 + xxl.AccessToken(cfg.XxlJob.AccessToken), //请求令牌(默认为空) + xxl.ExecutorIp(cfg.XxlJob.ExecutorIp), //可自动获取 + xxl.ExecutorPort(cfg.XxlJob.ExecutorPort), //默认9999(非必填) + xxl.RegistryKey(cfg.XxlJob.RegistryKey), //执行器名称 + xxl.SetLogger(&XxlLogger{}), //自定义日志 ) exec.Init() exec.Use(customMiddleware) //设置日志查看handler + exec.RegTask("reload_recomm_list", s.ReloadRecommList) + exec.RegTask("clear_veri_code_send_times", s.ClearVeriCodeSendTimes) + exec.RegTask("create_daily_statement", s.CreateDailyStatement) exec.LogHandler(customLogHandle) //注册任务handler - if err := exec.Run(); err != nil { + + if err = exec.Run(); err != nil { logger.Error("xxl-job executor init fail: %v", err) return } - return new(CronService) + return } // 自定义日志处理器 @@ -84,147 +86,3 @@ func customMiddleware(tf xxl.TaskFunc) xxl.TaskFunc { return res } } - -func (s *CronService) ReadLoggerConfig(config configcenter.LoggerConfig) { - s.fileAbsPath = config.FileAbsPath -} - -func (s *CronService) Run() { - s.ReloadRecommList() - s.ImageAuditBatch() - s.TextAuditBatch() - s.ClearVeriCodeSendTimes() - s.CreateDailyStatement() -} - -func (s *CronService) ReloadRecommList() { - // 每五分钟运行一次 task - // 申请一个调度器 - ctx := &gin.Context{} - scheduler := gocron.NewScheduler(time.UTC) - scheduler.Every(5).Minute().Do(func() { - logger.Info("Refreshing recommendation list cached in redis...") - // midList, _, ec := DefaultService.OpGetIsFollowedAccountRelationCount(ctx) - // if ec != errcode.ErrCodeAccountRelationSrvOk { - // logger.Error("OpGetAccountRelationCount fail, ec: %v", ec) - // scheduler.Stop() - // } - list, ec := DefaultService.OpGetStreamerList(ctx, &streamerproto.OpListReq{ - Sort: "-fans", - }) - if ec != errcode.ErrCodeAccountRelationSrvOk { - logger.Error("OpGetStreamerList fail, ec: %v", ec) - scheduler.Stop() - } - midList := make([]int64, len(list)) - for i, streamer := range list { - midList[i] = util.DerefInt64(streamer.Mid) - } - err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0) - if err != nil { - logger.Error("Redis cache fail, err: %v", err) - } - logger.Info("Refresh recommendation list cached in redis accomplished...") - }) - scheduler.StartAsync() -} - -func (s *CronService) ImageAuditBatch() { - scheduler := gocron.NewScheduler(time.UTC) - scheduler.Every(10).Second().Do(func() { - imageaudit.GiveNoticeToBatch() - }) - scheduler.StartAsync() -} - -func (s *CronService) TextAuditBatch() { - scheduler := gocron.NewScheduler(time.UTC) - scheduler.Every(10).Second().Do(func() { - textaudit.GiveNoticeToBatch() - }) - scheduler.StartAsync() -} - -func (s *CronService) ClearVeriCodeSendTimes() { - loc, _ := time.LoadLocation("Asia/Shanghai") - scheduler := gocron.NewScheduler(loc) - - scheduler.Every(1).Day().At("00:00").Do(func() { - logger.Info("Clearing vericode_send_times collection...") - ctx := &gin.Context{} - if err := _DefaultVeriCodeSendTimes.OpClear(ctx); err != nil { - logger.Error("Clear vericode_send_times collection fail: %v", err) - } - logger.Info("vericode_send_times collection has been cleared") - }) - scheduler.StartAsync() -} - -// 统计每日报表 -func (s *CronService) CreateDailyStatement() { - loc, _ := time.LoadLocation("Asia/Shanghai") - scheduler := gocron.NewScheduler(loc) - - scheduler.CronWithSeconds("0 0 * * * *").Do(func() { - - //拿到现在的时间戳 - nowTimeStamp := util.GetHourStartTimeStamp(time.Now()) - - //获取上个小时时间段 - startTimeStamp := nowTimeStamp - int64(60*60) - endTimeStamp := nowTimeStamp - int64(1) - starttime := time.Unix(startTimeStamp, 0) - endtime := time.Unix(endTimeStamp, 0) - - //获取H5接口访问量 - logpath := fmt.Sprintf("%v_%02d%02d%02d.Global", s.fileAbsPath, starttime.Year(), starttime.Month(), starttime.Day()) - count, err := DefaultScriptsService.QueryCallCount(consts.H5CallUrl, logpath) - if err != nil { - logger.Error("query h5 call count fail : %v", err) - } - - //获取用户总量 - accountCount, err := _DefaultAccount.OpCount(&gin.Context{}, &accountproto.OpCountReq{ - CtLowerBound: goproto.Int64(int64(consts.AppEnterProductionTime)), - }) - if err != nil { - logger.Error("_DefaultAccount OpCount fail : %v", err) - } - - //获取订单总量 - orderCounts, err := _DefaultVas.GetOrderCountGroupByStatus(&gin.Context{}, &vasproto.GetOrderByStatusReq{ - CtStart: goproto.Int64(consts.AppEnterProductionTime), - CtEnd: nil, - }) - if err != nil { - logger.Error("_DefaultVas GetOrderCountByStatus fail : %v", err) - } - - finishedOrderCount := int32(0) - allOrderCount := int32(0) - for _, orderCount := range orderCounts { - if util.DerefInt32(orderCount.OrderStatus) != 0 { - finishedOrderCount += util.DerefInt32(orderCount.Count) - } - allOrderCount += util.DerefInt32(orderCount.Count) - } - - dailyStatement := &dbstruct.DailyStatement{ - H5CallCount: goproto.Int64(int64(count)), - RegisteredUserCount: goproto.Int64(accountCount), - OrderCreatedCount: goproto.Int64(int64(allOrderCount)), - OrderFinishedCount: goproto.Int64(int64(finishedOrderCount)), - StartTime: goproto.Int64(startTimeStamp), - EndTime: goproto.Int64(endTimeStamp), - } - err = _DefaultDailyStatement.OpCreate(&gin.Context{}, &daily_statementproto.OpCreateReq{ - DailyStatement: dailyStatement, - }) - if err != nil { - logger.Error("_DefaultDailyStatement OpCreate fail : %v", err) - } - - logger.Info("%v - %v statement data has created...", starttime, endtime) - }) - scheduler.StartAsync() -} diff --git a/app/mix/service/imageauditservice.go b/app/mix/service/imageauditservice.go index b1fd30a9..39adc1c5 100644 --- a/app/mix/service/imageauditservice.go +++ b/app/mix/service/imageauditservice.go @@ -6,27 +6,19 @@ import ( "service/app/mix/dao" "service/bizcommon/util" "service/dbstruct" - "service/library/contentaudit/imageaudit" "github.com/gin-gonic/gin" goproto "google.golang.org/protobuf/proto" ) -func (s *Service) CreateUpdateAccountImageAudit(ctx *gin.Context, oldAccount *dbstruct.Account, newAccount *dbstruct.Account) (task *imageaudit.ImageAuditTaskControlBlock) { +func (s *Service) CreateUpdateAccountImageAudit(ctx *gin.Context, oldAccount *dbstruct.Account, newAccount *dbstruct.Account) (tasks []*dbstruct.ImageAuditTask) { if newAccount.Avatar == nil { - return + return nil } - rollBackFunc := func() error { - return _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{ - Account: &dbstruct.Account{ - Mid: oldAccount.Mid, - Avatar: oldAccount.Avatar, - }, - }) - } + tasks = make([]*dbstruct.ImageAuditTask, 0) - task = imageaudit.NewImageAuditTaskControlBlock(&dbstruct.ImageAuditTask{ + tasks = append(tasks, &dbstruct.ImageAuditTask{ RouteUrl: goproto.String(ctx.Request.URL.Path), AssociativeDatabase: goproto.String("account"), AssociativeTableName: goproto.String("account"), @@ -34,25 +26,16 @@ func (s *Service) CreateUpdateAccountImageAudit(ctx *gin.Context, oldAccount *db AssociativeTableColumn: goproto.String("avatar"), AuditedMedia: newAccount.Avatar, OldMedia: oldAccount.Avatar, - }, rollBackFunc) + }) return } -func (s *Service) CreateUpdateStreamerImageAudit(ctx *gin.Context, oldStreamer *dbstruct.Streamer, newStreamer *dbstruct.Streamer) (tasks []*imageaudit.ImageAuditTaskControlBlock) { - tasks = make([]*imageaudit.ImageAuditTaskControlBlock, 0) +func (s *Service) CreateUpdateStreamerImageAudit(ctx *gin.Context, oldStreamer *dbstruct.Streamer, newStreamer *dbstruct.Streamer) (tasks []*dbstruct.ImageAuditTask) { + tasks = make([]*dbstruct.ImageAuditTask, 0) if newStreamer.Cover != nil { - rollBackFunc := func() error { - return _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{ - Streamer: &dbstruct.Streamer{ - Mid: oldStreamer.Mid, - Cover: oldStreamer.Cover, - }, - }) - } - - tasks = append(tasks, imageaudit.NewImageAuditTaskControlBlock(&dbstruct.ImageAuditTask{ + tasks = append(tasks, &dbstruct.ImageAuditTask{ RouteUrl: goproto.String(ctx.Request.URL.Path), AssociativeDatabase: goproto.String("streamer"), AssociativeTableName: goproto.String("streamer"), @@ -60,20 +43,11 @@ func (s *Service) CreateUpdateStreamerImageAudit(ctx *gin.Context, oldStreamer * AssociativeTableColumn: goproto.String("cover"), AuditedMedia: newStreamer.Cover, OldMedia: oldStreamer.Cover, - }, rollBackFunc)) + }) } if newStreamer.Album != nil { - rollBackFunc := func() error { - return _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{ - Streamer: &dbstruct.Streamer{ - Mid: oldStreamer.Mid, - Album: oldStreamer.Album, - }, - }) - } - - tasks = append(tasks, imageaudit.NewImageAuditTaskControlBlock(&dbstruct.ImageAuditTask{ + tasks = append(tasks, &dbstruct.ImageAuditTask{ RouteUrl: goproto.String(ctx.Request.URL.Path), AssociativeDatabase: goproto.String("streamer"), AssociativeTableName: goproto.String("streamer"), @@ -81,7 +55,7 @@ func (s *Service) CreateUpdateStreamerImageAudit(ctx *gin.Context, oldStreamer * AssociativeTableColumn: goproto.String("album"), AuditedMedia: newStreamer.Album, OldMedia: oldStreamer.Album, - }, rollBackFunc)) + }) } return diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go new file mode 100644 index 00000000..99b614ee --- /dev/null +++ b/app/mix/service/xxljob_tasks.go @@ -0,0 +1,149 @@ +package service + +import ( + "context" + "fmt" + "service/api/consts" + "service/api/errcode" + accountproto "service/api/proto/account/proto" + daily_statementproto "service/api/proto/daily_statement/proto" + streamerproto "service/api/proto/streamer/proto" + vasproto "service/api/proto/vas/proto" + "service/bizcommon/util" + "service/dbstruct" + "service/library/contentaudit/textaudit" + "service/library/logger" + "service/library/redis" + "time" + + "github.com/gin-gonic/gin" + "github.com/go-co-op/gocron" + xxl "github.com/xxl-job/xxl-job-executor-go" + goproto "google.golang.org/protobuf/proto" +) + +func (s *CronService) ReloadRecommList(ctx context.Context, param *xxl.RunReq) (msg string) { + logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) + logger.Info("Refreshing recommendation list cached in redis...") + list, ec := DefaultService.OpGetStreamerList(&gin.Context{}, &streamerproto.OpListReq{ + Sort: "-fans", + }) + if ec != errcode.ErrCodeAccountRelationSrvOk { + logger.Error("OpGetStreamerList fail, ec: %v", ec) + return fmt.Sprintf("OpGetStreamerList fail, ec: %v", ec) + } + midList := make([]int64, len(list)) + for i, streamer := range list { + midList[i] = util.DerefInt64(streamer.Mid) + } + err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0) + if err != nil { + logger.Error("Redis cache fail, err: %v", err) + return fmt.Sprintf("Redis cache fail, err: %v", err) + } + logger.Info("Refresh recommendation list cached in redis accomplished...") + return "Refresh recommendation list cached in redis accomplished" +} + +func (s *CronService) ClearVeriCodeSendTimes(ctx context.Context, param *xxl.RunReq) (msg string) { + logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) + logger.Info("Clearing vericode_send_times collection...") + if err := _DefaultVeriCodeSendTimes.OpClear(&gin.Context{}); err != nil { + logger.Error("Clear vericode_send_times collection fail: %v", err) + return fmt.Sprintf("Clear vericode_send_times collection fail: %v", err) + } + logger.Info("vericode_send_times collection has been cleared") + return "vericode_send_times collection has been cleared" +} + +// 统计每日报表 +func (s *CronService) CreateDailyStatement(ctx context.Context, param *xxl.RunReq) (msg string) { + logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) + //拿到现在的时间戳 + nowTimeStamp := util.GetHourStartTimeStamp(time.Now()) + + //获取上个小时时间段 + startTimeStamp := nowTimeStamp - int64(60*60) + endTimeStamp := nowTimeStamp - int64(1) + starttime := time.Unix(startTimeStamp, 0) + endtime := time.Unix(endTimeStamp, 0) + + //获取H5接口访问量 + logpath := fmt.Sprintf("%v_%02d%02d%02d.Global", s.fileAbsPath, starttime.Year(), starttime.Month(), starttime.Day()) + count, err := DefaultScriptsService.QueryCallCount(consts.H5CallUrl, logpath) + if err != nil { + logger.Error("query h5 call count fail : %v", err) + return fmt.Sprintf("query h5 call count fail : %v", err) + } + + //获取用户总量 + accountCount, err := _DefaultAccount.OpCount(&gin.Context{}, &accountproto.OpCountReq{ + CtLowerBound: goproto.Int64(int64(consts.AppEnterProductionTime)), + }) + if err != nil { + logger.Error("_DefaultAccount OpCount fail : %v", err) + return fmt.Sprintf("_DefaultAccount OpCount fail : %v", err) + } + + //获取订单总量 + orderCounts, err := _DefaultVas.GetOrderCountGroupByStatus(&gin.Context{}, &vasproto.GetOrderByStatusReq{ + CtStart: goproto.Int64(consts.AppEnterProductionTime), + CtEnd: nil, + }) + if err != nil { + logger.Error("_DefaultVas GetOrderCountByStatus fail : %v", err) + return fmt.Sprintf("_DefaultVas GetOrderCountByStatus fail : %v", err) + } + + finishedOrderCount := int32(0) + allOrderCount := int32(0) + for _, orderCount := range orderCounts { + if util.DerefInt32(orderCount.OrderStatus) != 0 { + finishedOrderCount += util.DerefInt32(orderCount.Count) + } + allOrderCount += util.DerefInt32(orderCount.Count) + } + + dailyStatement := &dbstruct.DailyStatement{ + H5CallCount: goproto.Int64(int64(count)), + RegisteredUserCount: goproto.Int64(accountCount), + OrderCreatedCount: goproto.Int64(int64(allOrderCount)), + OrderFinishedCount: goproto.Int64(int64(finishedOrderCount)), + StartTime: goproto.Int64(startTimeStamp), + EndTime: goproto.Int64(endTimeStamp), + } + err = _DefaultDailyStatement.OpCreate(&gin.Context{}, &daily_statementproto.OpCreateReq{ + DailyStatement: dailyStatement, + }) + if err != nil { + logger.Error("_DefaultDailyStatement OpCreate fail : %v", err) + return fmt.Sprintf("_DefaultDailyStatement OpCreate fail : %v", err) + } + + logger.Info("%v - %v statement data has created...", starttime, endtime) + return fmt.Sprintf("%v - %v statement data has created...", starttime, endtime) +} + +func (s *CronService) ImageAuditBatch() { + scheduler := gocron.NewScheduler(time.UTC) + scheduler.Every(10).Second().Do(func() { + // imageaudit.GiveNoticeToBatch() + }) + scheduler.StartAsync() +} + +// func (s *CronService) ImageAuditBatch() { +// scheduler := gocron.NewScheduler(time.UTC) +// scheduler.Every(10).Second().Do(func() { +// imageaudit.GiveNoticeToBatch() +// }) +// scheduler.StartAsync() +// } + +func (s *CronService) TextAuditBatch() { + scheduler := gocron.NewScheduler(time.UTC) + scheduler.Every(10).Second().Do(func() { + textaudit.GiveNoticeToBatch() + }) + scheduler.StartAsync() +} diff --git a/etc/mix/mix-test.yaml b/etc/mix/mix-test.yaml index ea1d7ec3..3158ac12 100644 --- a/etc/mix/mix-test.yaml +++ b/etc/mix/mix-test.yaml @@ -90,4 +90,11 @@ redis: prefix: "" server_info: - file_server_domain_name: "https://file.wishpaldev.tech/" \ No newline at end of file + file_server_domain_name: "https://file.wishpaldev.tech/" + +xxl_job: + server_addr: "http://127.0.0.1:9800/xxl-job-admin" + access_token: "default_token" + executor_ip: "127.0.0.1" + executor_port: "9801" + registry_key: "golang-jobs-executor" \ No newline at end of file diff --git a/library/configcenter/configcenter.go b/library/configcenter/configcenter.go index d7606c5b..ca59e20d 100644 --- a/library/configcenter/configcenter.go +++ b/library/configcenter/configcenter.go @@ -134,6 +134,15 @@ type ServerInfoConfig struct { FileServerDomainName string `json:"file_server_domain_name" yaml:"file_server_domain_name"` } +// xxl-job服务配置 +type XxlJobConfig struct { + ServerAddr string `json:"server_addr" yaml:"server_addr"` + AccessToken string `json:"access_token" yaml:"access_token"` + ExecutorIp string `json:"executor_ip" yaml:"executor_ip"` + ExecutorPort string `json:"executor_port" yaml:"executor_port"` + RegistryKey string `json:"registry_key" yaml:"registry_key"` +} + func LoadConfig(configFilePath string, cfg interface{}) error { cfgStr, err := ioutil.ReadFile(configFilePath) if err != nil { diff --git a/library/contentaudit/imageaudit/client.go b/library/contentaudit/imageaudit/client.go index 774806d7..cb8e4385 100644 --- a/library/contentaudit/imageaudit/client.go +++ b/library/contentaudit/imageaudit/client.go @@ -69,15 +69,15 @@ func Init(cfg *configcenter.ImageAuditConfig) (err error) { // 初始化调度器 initScheduler(cfg) - // 启动调度任务 - defaultImageAuditTaskScheduler.Run() + // // 启动调度任务 + // defaultImageAuditTaskScheduler.Run() return } -func GiveNoticeToBatch() { - defaultImageAuditTaskScheduler.GiveNoticeToBatch() -} +// func GiveNoticeToBatch() { +// defaultImageAuditTaskScheduler.GiveNoticeToBatch() +// } func ConnectToImageAuditService(serivce ImageAuditService) { _DefaultImageAudit = serivce diff --git a/library/contentaudit/imageaudit/control_block.go b/library/contentaudit/imageaudit/control_block.go new file mode 100644 index 00000000..6b4da948 --- /dev/null +++ b/library/contentaudit/imageaudit/control_block.go @@ -0,0 +1,59 @@ +package imageaudit + +import ( + "fmt" + "service/api/consts" + "service/bizcommon/util" + "service/dbstruct" + + goproto "google.golang.org/protobuf/proto" +) + +// 图像审核任务控制块 +// ActionId设计初衷:由于图像审核是定时任务触发的批量作业,如果在一次作业间隔有针对同一个图像媒体的多次更新,则会提交关于它的多次审核,需要配合乐观锁保证数据一致性 +type ImageAuditTaskControlBlock struct { + // 静态元素 + ActionId string // 审核动作id号,由图像审核实体数据库四要素拼接而成,用于指示对数据库-表-单条数据-图像字段的审核动作 + ImageAuditTask *dbstruct.ImageAuditTask // 审核任务 + RollbackFunc func() error // 审核失败时的回退方法 + + // 操作对象 + Images []*dbstruct.MediaComponent // 被审核的图像序列 + + // 动态元素 + IsTaskPassed bool // 任务状态,仅当任务已确定完成,即已完成审核任务的分片数 = 审核任务分片数时,才有意义 + AuditedFragmentsNum int // 已完成审核任务的分片数 + IsGivingNoticeToBatch bool // 是否进行批处理 +} + +// 新建图像审核任务块 +func NewImageAuditTaskControlBlock(task *dbstruct.ImageAuditTask, rollbackFunc func() error) (tcb *ImageAuditTaskControlBlock) { + if task == nil || task.AuditedMedia == nil { + return + } + fragmentNum := len(util.DerefInt64Slice(task.AuditedMedia.ImageIds)) + if fragmentNum == 0 { + return + } + + task.BatchId = goproto.String(defaultImageAuditTaskScheduler.batchId) + task.Status = goproto.Int64(consts.ImageAudit_Created) + if fragmentNum == 1 { + task.IsFragmented = goproto.Int64(0) + task.FragmentsNum = goproto.Int64(1) + } else { + task.IsFragmented = goproto.Int64(1) + task.FragmentsNum = goproto.Int64(int64(fragmentNum)) + } + + tcb = &ImageAuditTaskControlBlock{ + ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName), + util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)), + ImageAuditTask: task, + RollbackFunc: rollbackFunc, + IsTaskPassed: true, + AuditedFragmentsNum: 0, + IsGivingNoticeToBatch: false, + } + return +} diff --git a/library/contentaudit/imageaudit/imageaudit.go b/library/contentaudit/imageaudit/imageaudit.go index 42dc751d..e6392032 100644 --- a/library/contentaudit/imageaudit/imageaudit.go +++ b/library/contentaudit/imageaudit/imageaudit.go @@ -19,6 +19,20 @@ import ( "github.com/gin-gonic/gin" ) +// 图像审核主逻辑 +func Run() (err error) { + + // 更新batchId + batchId := defaultImageAuditTaskScheduler.batchId + defaultImageAuditTaskScheduler.batchId = genereteBatchId() + + // 查询该批次所有审核任务 + imageaudittasks, err := _DefaultImageAuditTask.OpList(&gin.Context{}, &imageaudittaskproto.OpListReq{ + BatchId: goproto.String(batchId), + Status: goproto.Int64(consts.ImageAudit_Created), + }) +} + // 图像审核主逻辑 func executeImageAuditTasks(tasks []*ImageAuditTaskControlBlock, batchId string) (err error) { diff --git a/library/contentaudit/imageaudit/task.go b/library/contentaudit/imageaudit/task.go index c5e5fe29..269d725f 100644 --- a/library/contentaudit/imageaudit/task.go +++ b/library/contentaudit/imageaudit/task.go @@ -1,7 +1,6 @@ package imageaudit import ( - "fmt" "service/api/consts" imageauditproto "service/api/proto/imageaudit/proto" imageaudittaskproto "service/api/proto/imageaudittask/proto" @@ -13,86 +12,49 @@ import ( goproto "google.golang.org/protobuf/proto" ) -// 图像审核任务控制块 -// ActionId设计初衷:由于图像审核是定时任务触发的批量作业,如果在一次作业间隔有针对同一个图像媒体的多次更新,则会提交关于它的多次审核,需要配合乐观锁保证数据一致性 -type ImageAuditTaskControlBlock struct { - // 静态元素 - ActionId string // 审核动作id号,由图像审核实体数据库四要素拼接而成,用于指示对数据库-表-单条数据-图像字段的审核动作 - ImageAuditTask *dbstruct.ImageAuditTask // 审核任务 - RollbackFunc func() error // 审核失败时的回退方法 - - // 操作对象 - Images []*dbstruct.MediaComponent // 被审核的图像序列 - - // 动态元素 - IsTaskPassed bool // 任务状态,仅当任务已确定完成,即已完成审核任务的分片数 = 审核任务分片数时,才有意义 - AuditedFragmentsNum int // 已完成审核任务的分片数 - IsGivingNoticeToBatch bool // 是否进行批处理 +func AddTasks(tasks []*dbstruct.ImageAuditTask) error { + for _, task := range tasks { + err := AddTask(task) + if err != nil { + return err + } + } + return nil } -// 新建图像审核任务块 -func NewImageAuditTaskControlBlock(task *dbstruct.ImageAuditTask, rollbackFunc func() error) (tcb *ImageAuditTaskControlBlock) { +func AddTask(task *dbstruct.ImageAuditTask) error { if task == nil || task.AuditedMedia == nil { - return + return nil } fragmentNum := len(util.DerefInt64Slice(task.AuditedMedia.ImageIds)) if fragmentNum == 0 { - return + return nil } task.BatchId = goproto.String(defaultImageAuditTaskScheduler.batchId) task.Status = goproto.Int64(consts.ImageAudit_Created) + if fragmentNum == 1 { task.IsFragmented = goproto.Int64(0) task.FragmentsNum = goproto.Int64(1) + + // 写入图像审核表 + if err := prepareFragmentedImageAuditTask(task); err != nil { + return err + } } else { task.IsFragmented = goproto.Int64(1) task.FragmentsNum = goproto.Int64(int64(fragmentNum)) - } - tcb = &ImageAuditTaskControlBlock{ - ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName), - util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)), - ImageAuditTask: task, - RollbackFunc: rollbackFunc, - IsTaskPassed: true, - AuditedFragmentsNum: 0, - IsGivingNoticeToBatch: false, - } - return -} - -func AddTask(task *ImageAuditTaskControlBlock) { - if err := task.Prepare(); err != nil { - logger.Error("ImageAuditTaskPrepareException: %v", err) - return - } - defaultImageAuditTaskScheduler.AddTask(task) -} - -// 将图像审核任务信息写入数据库,准备送审 -func (task *ImageAuditTaskControlBlock) Prepare() (err error) { - - if task == nil { - return fmt.Errorf("task is nil, check your input") - } - - // 1.写入图像审核表 - if util.DerefInt64(task.ImageAuditTask.IsFragmented) == 1 { - if task.Images, err = prepareFragmentedImageAuditTask(task.ImageAuditTask); err != nil { + // 写入图像审核表 + if err := prepareNotFragmentedImageAuditTask(task); err != nil { return err } - } else { - if err := prepareNotFragmentedImageAuditTask(task.ImageAuditTask); err != nil { - return err - } - task.Images = make([]*dbstruct.MediaComponent, 1) - task.Images[0] = task.ImageAuditTask.AuditedMedia } - // 2.写入图像审核任务表 + // 写入图像审核任务表 if err := _DefaultImageAuditTask.OpCreate(&gin.Context{}, &imageaudittaskproto.OpCreateReq{ - ImageAuditTask: task.ImageAuditTask, + ImageAuditTask: task, }); err != nil { logger.Error("Imageaudittask OpCreate failed: %v", err) return err @@ -102,21 +64,20 @@ func (task *ImageAuditTaskControlBlock) Prepare() (err error) { } // 根据任务信息,创建单个图片的图片审核,并存入数据库,返回images -func prepareFragmentedImageAuditTask(task *dbstruct.ImageAuditTask) (images []*dbstruct.MediaComponent, err error) { +func prepareFragmentedImageAuditTask(task *dbstruct.ImageAuditTask) (err error) { ctx := &gin.Context{} fragmentsNum := util.DerefInt64(task.FragmentsNum) - images = make([]*dbstruct.MediaComponent, fragmentsNum) imageAudits := make([]*dbstruct.ImageAudit, fragmentsNum) // 将媒体拆分为单个图像并存入图片审核表 imageIds := util.DerefInt64Slice(task.AuditedMedia.ImageIds) for i, imageId := range imageIds { - images[i] = &dbstruct.MediaComponent{ + image := &dbstruct.MediaComponent{ ImageIds: util.Int64Slice([]int64{imageId}), } imageAudits[i] = &dbstruct.ImageAudit{ - AuditedMedia: images[i], + AuditedMedia: image, BatchId: task.BatchId, Status: goproto.Int64(consts.ImageAudit_Created), } diff --git a/library/contentaudit/imageaudit/taskscheduler.go b/library/contentaudit/imageaudit/taskscheduler.go index 3a1b4cbe..fb25702e 100644 --- a/library/contentaudit/imageaudit/taskscheduler.go +++ b/library/contentaudit/imageaudit/taskscheduler.go @@ -3,7 +3,6 @@ package imageaudit import ( "fmt" "service/library/configcenter" - "service/library/logger" "time" ) @@ -11,12 +10,8 @@ var defaultImageAuditTaskScheduler *ImageAuditTaskScheduler // 图像审核任务调度器 type ImageAuditTaskScheduler struct { - // 缓冲池、同步标志 - taskBuffer chan *ImageAuditTaskControlBlock // 缓存待处理任务的缓冲池 - batchFlag chan bool // 批处理同步标志 - - // 数据存储 - taskPacket []*ImageAuditTaskControlBlock // 一次批处理的任务包 + // // 同步标志 + batchFlag chan bool // 批处理同步标志 // 状态记录 batchId string // 当前批次号 @@ -24,26 +19,14 @@ type ImageAuditTaskScheduler struct { func initScheduler(cfg *configcenter.ImageAuditConfig) { defaultImageAuditTaskScheduler = &ImageAuditTaskScheduler{ - taskBuffer: make(chan *ImageAuditTaskControlBlock, cfg.TaskBufferSize), - batchFlag: make(chan bool, 1), - taskPacket: make([]*ImageAuditTaskControlBlock, 0), - batchId: genereteBatchId(), + // taskBuffer: make(chan *ImageAuditTaskControlBlock, cfg.TaskBufferSize), + batchFlag: make(chan bool, 1), + // taskPacket: make([]*ImageAuditTaskControlBlock, 0), + batchId: genereteBatchId(), } defaultImageAuditTaskScheduler.batchFlag <- true } -// 将任务加入缓冲池 -func (s *ImageAuditTaskScheduler) AddTask(task *ImageAuditTaskControlBlock) { - s.taskBuffer <- task -} - -// 通知进行批处理 -func (s *ImageAuditTaskScheduler) GiveNoticeToBatch() { - s.taskBuffer <- &ImageAuditTaskControlBlock{ - IsGivingNoticeToBatch: true, - } -} - // 批处理上锁 func (s *ImageAuditTaskScheduler) lock() { <-s.batchFlag @@ -54,46 +37,46 @@ func (s *ImageAuditTaskScheduler) unLock() { s.batchFlag <- true } -// 调度方法 -func (s *ImageAuditTaskScheduler) Run() { - go func() { - // 1.循环取出缓冲池中的任务 - // 2.如果任务标志进行批处理,则将任务包打包,进行审核作业,最后清空任务包,生成新的批次号 - // 3.否则,将任务加入任务包中,并为任务建立索引,记录任务执行顺序 - for { - task := <-s.taskBuffer - if task.IsGivingNoticeToBatch { - // 上锁 - s.lock() +// // 调度方法 +// func (s *ImageAuditTaskScheduler) Run() { +// go func() { +// // 1.循环取出缓冲池中的任务 +// // 2.如果任务标志进行批处理,则将任务包打包,进行审核作业,最后清空任务包,生成新的批次号 +// // 3.否则,将任务加入任务包中,并为任务建立索引,记录任务执行顺序 +// for { +// task := <-s.taskBuffer +// if task.IsGivingNoticeToBatch { +// // 上锁 +// s.lock() - // 打包任务 - packet := s.taskPacket - s.taskPacket = make([]*ImageAuditTaskControlBlock, 0) +// // 打包任务 +// packet := s.taskPacket +// s.taskPacket = make([]*ImageAuditTaskControlBlock, 0) - // 取出状态并重置 - batchId := s.batchId - s.batchId = genereteBatchId() - // 执行审核作业 - go func() { - err := executeImageAuditTasks(packet, batchId) - if err != nil { - logger.Error("Batch failed : %v", err) - } - // 解锁 - s.unLock() - }() +// // 取出状态并重置 +// batchId := s.batchId +// s.batchId = genereteBatchId() +// // 执行审核作业 +// go func() { +// err := executeImageAuditTasks(packet, batchId) +// if err != nil { +// logger.Error("Batch failed : %v", err) +// } +// // 解锁 +// s.unLock() +// }() - } else { - s.taskPacket = append(s.taskPacket, task) - } - } - }() -} +// } else { +// s.taskPacket = append(s.taskPacket, task) +// } +// } +// }() +// } // 生成批次号 func genereteBatchId() string { now := time.Now() y, m, d := now.Date() h, mi, s := now.Clock() - return fmt.Sprintf("%d%d%d%d%d%02d", y, m, d, h, mi, s) + return fmt.Sprintf("%d%2d%2d%2d%2d%02d", y, m, d, h, mi, s) } diff --git a/library/taginterceptor/crypt_test.go b/library/taginterceptor/crypt_test.go index b1c8450f..2f2c39e2 100644 --- a/library/taginterceptor/crypt_test.go +++ b/library/taginterceptor/crypt_test.go @@ -9,6 +9,7 @@ import ( "service/library/logger" "service/library/mycrypto" "testing" + "time" ) func Test(t *testing.T) { @@ -25,20 +26,20 @@ func Test(t *testing.T) { logger.Error("cryptoService init, err: %v", err) } - mobilePhone := "Loum3xCiIgJ/tnp+AdxbIw==" - base64DecryptedBytes, _ := base64.StdEncoding.DecodeString(mobilePhone) - phone, _ := mycrypto.CryptoServiceInstance().AES.Decrypt(base64DecryptedBytes) - fmt.Println(string(phone)) - hash := mycrypto.CryptoServiceInstance().SHA256.Encrypt(phone) - fmt.Println(string(hash)) - // mobilePhone := "17738729985" - // rsaBytes, _ := mycrypto.CryptoServiceInstance().RSA.Encrypt([]byte(mobilePhone)) - // base64EncryptedBytes := make([]byte, base64.StdEncoding.EncodedLen(len(rsaBytes))) - // base64.StdEncoding.Encode(base64EncryptedBytes, rsaBytes) - // phoneHash := mycrypto.CryptoServiceInstance().SHA256.Encrypt([]byte(mobilePhone)) + // mobilePhone := "Loum3xCiIgJ/tnp+AdxbIw==" + // base64DecryptedBytes, _ := base64.StdEncoding.DecodeString(mobilePhone) + // phone, _ := mycrypto.CryptoServiceInstance().AES.Decrypt(base64DecryptedBytes) + // fmt.Println(string(phone)) + // hash := mycrypto.CryptoServiceInstance().SHA256.Encrypt(phone) + // fmt.Println(string(hash)) + mobilePhone := "19900000009" + rsaBytes, _ := mycrypto.CryptoServiceInstance().RSA.Encrypt([]byte(mobilePhone)) + base64EncryptedBytes := make([]byte, base64.StdEncoding.EncodedLen(len(rsaBytes))) + base64.StdEncoding.Encode(base64EncryptedBytes, rsaBytes) + phoneHash := mycrypto.CryptoServiceInstance().SHA256.Encrypt([]byte(mobilePhone)) - // fmt.Printf("Time:%v\n", time.Now().Unix()) - // fmt.Printf("RSA:%v\n", string(base64EncryptedBytes)) - // fmt.Printf("PhoneHash:%v\n", phoneHash) + fmt.Printf("Time:%v\n", time.Now().Unix()) + fmt.Printf("RSA:%v\n", string(base64EncryptedBytes)) + fmt.Printf("PhoneHash:%v\n", phoneHash) } diff --git a/vendor/github.com/xxl-job/xxl-job-executor-go/executor.go b/vendor/github.com/xxl-job/xxl-job-executor-go/executor.go index 83b59f12..f92f640f 100644 --- a/vendor/github.com/xxl-job/xxl-job-executor-go/executor.go +++ b/vendor/github.com/xxl-job/xxl-job-executor-go/executor.go @@ -288,7 +288,6 @@ func (e *executor) registry() { e.log.Error("执行器注册失败3:" + string(body)) return } - e.log.Info("执行器注册成功:" + string(body)) }() }