by Robin at 20240118;

This commit is contained in:
Leufolium 2024-01-18 18:19:31 +08:00
parent 19953dddb3
commit 1602de1156
15 changed files with 369 additions and 355 deletions

View File

@ -75,7 +75,11 @@ func main() {
msg := fmt.Sprintf("Service init fail, err: %v", err)
PrintAndExit(msg)
}
service.DefaultCronService.ReadLoggerConfig(cfg.Log)
err = service.DefaultCronService.Init(cfg)
if err != nil {
msg := fmt.Sprintf("CronService init fail, err: %v", err)
PrintAndExit(msg)
}
// 连接到图像审核任务
service.DefaultService.ConnectToImageAudit()
@ -90,9 +94,6 @@ func main() {
// 初始化媒体填充服务
mediafiller.Init(cfg.ServerInfo)
// 启动定时任务
service.DefaultCronService.Run()
// 初始化http server
router := httpengine.NewRouter()
middleware.InitJwtAuthenticator(service.DefaultService.OpVerifyToken)

View File

@ -16,4 +16,5 @@ type ConfigSt struct {
RedisConfig *configcenter.RedisConfig `json:"redis" yaml:"redis"` // redis
ServerInfo *configcenter.ServerInfoConfig `json:"server_info" yaml:"server_info"` // 服务器信息
TextAudit *configcenter.TextAuditConfig `json:"textaudit" yaml:"textaudit"` // 文字内容审核服务
XxlJob *configcenter.XxlJobConfig `json:"xxl_job" yaml:"xxl_job"` // xxl-job作业系统
}

View File

@ -331,8 +331,8 @@ func (s *Service) ApiUpdateAccount(ctx *gin.Context, req *accountproto.ApiUpdate
}
// 创建审核任务
imageaudittask := s.CreateUpdateAccountImageAudit(ctx, oldAccount, req.Account)
textaudittask := s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account)
imageaudittasks := s.CreateUpdateAccountImageAudit(ctx, oldAccount, req.Account)
textaudittasks := s.CreateUpdateAccountTextAudit(ctx, oldAccount, req.Account)
// 用户只允许修改昵称和头像
if err := _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{
@ -348,9 +348,7 @@ func (s *Service) ApiUpdateAccount(ctx *gin.Context, req *accountproto.ApiUpdate
}
// 审核任务加入队列
if imageaudittask != nil {
imageaudit.AddTask(imageaudittask)
}
imageaudit.AddTasks(imageaudittasks)
if textaudittask != nil {
textaudit.AddTask(textaudittask)
}

View File

@ -2,26 +2,11 @@ package service
import (
"context"
"fmt"
"service/api/consts"
"service/api/errcode"
accountproto "service/api/proto/account/proto"
daily_statementproto "service/api/proto/daily_statement/proto"
streamerproto "service/api/proto/streamer/proto"
vasproto "service/api/proto/vas/proto"
"service/bizcommon/util"
"service/dbstruct"
"service/library/configcenter"
"service/library/contentaudit/imageaudit"
"service/library/contentaudit/textaudit"
"errors"
"service/app/mix/conf"
"service/library/logger"
"service/library/redis"
"time"
"github.com/gin-gonic/gin"
"github.com/go-co-op/gocron"
xxl "github.com/xxl-job/xxl-job-executor-go"
goproto "google.golang.org/protobuf/proto"
)
// 20240117: 接入xxl-job定时任务作业系统
@ -35,24 +20,41 @@ type CronService struct {
}
func NewCronService() *CronService {
return new(CronService)
}
func (s *CronService) Init(c any) (err error) {
cfg, ok := c.(*conf.ConfigSt)
if !ok {
err = errors.New("cfg struct type not expected")
logger.Error("service init, err: %v", err)
return
}
s.fileAbsPath = cfg.Log.FileAbsPath
exec := xxl.NewExecutor(
xxl.ServerAddr("http://127.0.0.1:9800/xxl-job-admin"),
xxl.AccessToken("default_token"), //请求令牌(默认为空)
xxl.ExecutorIp("127.0.0.1"), //可自动获取
xxl.ExecutorPort("9801"), //默认9999非必填
xxl.RegistryKey("golang-jobs"), //执行器名称
xxl.SetLogger(&XxlLogger{}), //自定义日志
xxl.ServerAddr(cfg.XxlJob.ServerAddr), //xxl-job-admin部署地址
xxl.AccessToken(cfg.XxlJob.AccessToken), //请求令牌(默认为空)
xxl.ExecutorIp(cfg.XxlJob.ExecutorIp), //可自动获取
xxl.ExecutorPort(cfg.XxlJob.ExecutorPort), //默认9999非必填
xxl.RegistryKey(cfg.XxlJob.RegistryKey), //执行器名称
xxl.SetLogger(&XxlLogger{}), //自定义日志
)
exec.Init()
exec.Use(customMiddleware)
//设置日志查看handler
exec.RegTask("reload_recomm_list", s.ReloadRecommList)
exec.RegTask("clear_veri_code_send_times", s.ClearVeriCodeSendTimes)
exec.RegTask("create_daily_statement", s.CreateDailyStatement)
exec.LogHandler(customLogHandle)
//注册任务handler
if err := exec.Run(); err != nil {
if err = exec.Run(); err != nil {
logger.Error("xxl-job executor init fail: %v", err)
return
}
return new(CronService)
return
}
// 自定义日志处理器
@ -84,147 +86,3 @@ func customMiddleware(tf xxl.TaskFunc) xxl.TaskFunc {
return res
}
}
func (s *CronService) ReadLoggerConfig(config configcenter.LoggerConfig) {
s.fileAbsPath = config.FileAbsPath
}
func (s *CronService) Run() {
s.ReloadRecommList()
s.ImageAuditBatch()
s.TextAuditBatch()
s.ClearVeriCodeSendTimes()
s.CreateDailyStatement()
}
func (s *CronService) ReloadRecommList() {
// 每五分钟运行一次 task
// 申请一个调度器
ctx := &gin.Context{}
scheduler := gocron.NewScheduler(time.UTC)
scheduler.Every(5).Minute().Do(func() {
logger.Info("Refreshing recommendation list cached in redis...")
// midList, _, ec := DefaultService.OpGetIsFollowedAccountRelationCount(ctx)
// if ec != errcode.ErrCodeAccountRelationSrvOk {
// logger.Error("OpGetAccountRelationCount fail, ec: %v", ec)
// scheduler.Stop()
// }
list, ec := DefaultService.OpGetStreamerList(ctx, &streamerproto.OpListReq{
Sort: "-fans",
})
if ec != errcode.ErrCodeAccountRelationSrvOk {
logger.Error("OpGetStreamerList fail, ec: %v", ec)
scheduler.Stop()
}
midList := make([]int64, len(list))
for i, streamer := range list {
midList[i] = util.DerefInt64(streamer.Mid)
}
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0)
if err != nil {
logger.Error("Redis cache fail, err: %v", err)
}
logger.Info("Refresh recommendation list cached in redis accomplished...")
})
scheduler.StartAsync()
}
func (s *CronService) ImageAuditBatch() {
scheduler := gocron.NewScheduler(time.UTC)
scheduler.Every(10).Second().Do(func() {
imageaudit.GiveNoticeToBatch()
})
scheduler.StartAsync()
}
func (s *CronService) TextAuditBatch() {
scheduler := gocron.NewScheduler(time.UTC)
scheduler.Every(10).Second().Do(func() {
textaudit.GiveNoticeToBatch()
})
scheduler.StartAsync()
}
func (s *CronService) ClearVeriCodeSendTimes() {
loc, _ := time.LoadLocation("Asia/Shanghai")
scheduler := gocron.NewScheduler(loc)
scheduler.Every(1).Day().At("00:00").Do(func() {
logger.Info("Clearing vericode_send_times collection...")
ctx := &gin.Context{}
if err := _DefaultVeriCodeSendTimes.OpClear(ctx); err != nil {
logger.Error("Clear vericode_send_times collection fail: %v", err)
}
logger.Info("vericode_send_times collection has been cleared")
})
scheduler.StartAsync()
}
// 统计每日报表
func (s *CronService) CreateDailyStatement() {
loc, _ := time.LoadLocation("Asia/Shanghai")
scheduler := gocron.NewScheduler(loc)
scheduler.CronWithSeconds("0 0 * * * *").Do(func() {
//拿到现在的时间戳
nowTimeStamp := util.GetHourStartTimeStamp(time.Now())
//获取上个小时时间段
startTimeStamp := nowTimeStamp - int64(60*60)
endTimeStamp := nowTimeStamp - int64(1)
starttime := time.Unix(startTimeStamp, 0)
endtime := time.Unix(endTimeStamp, 0)
//获取H5接口访问量
logpath := fmt.Sprintf("%v_%02d%02d%02d.Global", s.fileAbsPath, starttime.Year(), starttime.Month(), starttime.Day())
count, err := DefaultScriptsService.QueryCallCount(consts.H5CallUrl, logpath)
if err != nil {
logger.Error("query h5 call count fail : %v", err)
}
//获取用户总量
accountCount, err := _DefaultAccount.OpCount(&gin.Context{}, &accountproto.OpCountReq{
CtLowerBound: goproto.Int64(int64(consts.AppEnterProductionTime)),
})
if err != nil {
logger.Error("_DefaultAccount OpCount fail : %v", err)
}
//获取订单总量
orderCounts, err := _DefaultVas.GetOrderCountGroupByStatus(&gin.Context{}, &vasproto.GetOrderByStatusReq{
CtStart: goproto.Int64(consts.AppEnterProductionTime),
CtEnd: nil,
})
if err != nil {
logger.Error("_DefaultVas GetOrderCountByStatus fail : %v", err)
}
finishedOrderCount := int32(0)
allOrderCount := int32(0)
for _, orderCount := range orderCounts {
if util.DerefInt32(orderCount.OrderStatus) != 0 {
finishedOrderCount += util.DerefInt32(orderCount.Count)
}
allOrderCount += util.DerefInt32(orderCount.Count)
}
dailyStatement := &dbstruct.DailyStatement{
H5CallCount: goproto.Int64(int64(count)),
RegisteredUserCount: goproto.Int64(accountCount),
OrderCreatedCount: goproto.Int64(int64(allOrderCount)),
OrderFinishedCount: goproto.Int64(int64(finishedOrderCount)),
StartTime: goproto.Int64(startTimeStamp),
EndTime: goproto.Int64(endTimeStamp),
}
err = _DefaultDailyStatement.OpCreate(&gin.Context{}, &daily_statementproto.OpCreateReq{
DailyStatement: dailyStatement,
})
if err != nil {
logger.Error("_DefaultDailyStatement OpCreate fail : %v", err)
}
logger.Info("%v - %v statement data has created...", starttime, endtime)
})
scheduler.StartAsync()
}

View File

@ -6,27 +6,19 @@ import (
"service/app/mix/dao"
"service/bizcommon/util"
"service/dbstruct"
"service/library/contentaudit/imageaudit"
"github.com/gin-gonic/gin"
goproto "google.golang.org/protobuf/proto"
)
func (s *Service) CreateUpdateAccountImageAudit(ctx *gin.Context, oldAccount *dbstruct.Account, newAccount *dbstruct.Account) (task *imageaudit.ImageAuditTaskControlBlock) {
func (s *Service) CreateUpdateAccountImageAudit(ctx *gin.Context, oldAccount *dbstruct.Account, newAccount *dbstruct.Account) (tasks []*dbstruct.ImageAuditTask) {
if newAccount.Avatar == nil {
return
return nil
}
rollBackFunc := func() error {
return _DefaultAccount.OpUpdate(ctx, &accountproto.OpUpdateReq{
Account: &dbstruct.Account{
Mid: oldAccount.Mid,
Avatar: oldAccount.Avatar,
},
})
}
tasks = make([]*dbstruct.ImageAuditTask, 0)
task = imageaudit.NewImageAuditTaskControlBlock(&dbstruct.ImageAuditTask{
tasks = append(tasks, &dbstruct.ImageAuditTask{
RouteUrl: goproto.String(ctx.Request.URL.Path),
AssociativeDatabase: goproto.String("account"),
AssociativeTableName: goproto.String("account"),
@ -34,25 +26,16 @@ func (s *Service) CreateUpdateAccountImageAudit(ctx *gin.Context, oldAccount *db
AssociativeTableColumn: goproto.String("avatar"),
AuditedMedia: newAccount.Avatar,
OldMedia: oldAccount.Avatar,
}, rollBackFunc)
})
return
}
func (s *Service) CreateUpdateStreamerImageAudit(ctx *gin.Context, oldStreamer *dbstruct.Streamer, newStreamer *dbstruct.Streamer) (tasks []*imageaudit.ImageAuditTaskControlBlock) {
tasks = make([]*imageaudit.ImageAuditTaskControlBlock, 0)
func (s *Service) CreateUpdateStreamerImageAudit(ctx *gin.Context, oldStreamer *dbstruct.Streamer, newStreamer *dbstruct.Streamer) (tasks []*dbstruct.ImageAuditTask) {
tasks = make([]*dbstruct.ImageAuditTask, 0)
if newStreamer.Cover != nil {
rollBackFunc := func() error {
return _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{
Streamer: &dbstruct.Streamer{
Mid: oldStreamer.Mid,
Cover: oldStreamer.Cover,
},
})
}
tasks = append(tasks, imageaudit.NewImageAuditTaskControlBlock(&dbstruct.ImageAuditTask{
tasks = append(tasks, &dbstruct.ImageAuditTask{
RouteUrl: goproto.String(ctx.Request.URL.Path),
AssociativeDatabase: goproto.String("streamer"),
AssociativeTableName: goproto.String("streamer"),
@ -60,20 +43,11 @@ func (s *Service) CreateUpdateStreamerImageAudit(ctx *gin.Context, oldStreamer *
AssociativeTableColumn: goproto.String("cover"),
AuditedMedia: newStreamer.Cover,
OldMedia: oldStreamer.Cover,
}, rollBackFunc))
})
}
if newStreamer.Album != nil {
rollBackFunc := func() error {
return _DefaultStreamer.OpUpdate(ctx, &streamerproto.OpUpdateReq{
Streamer: &dbstruct.Streamer{
Mid: oldStreamer.Mid,
Album: oldStreamer.Album,
},
})
}
tasks = append(tasks, imageaudit.NewImageAuditTaskControlBlock(&dbstruct.ImageAuditTask{
tasks = append(tasks, &dbstruct.ImageAuditTask{
RouteUrl: goproto.String(ctx.Request.URL.Path),
AssociativeDatabase: goproto.String("streamer"),
AssociativeTableName: goproto.String("streamer"),
@ -81,7 +55,7 @@ func (s *Service) CreateUpdateStreamerImageAudit(ctx *gin.Context, oldStreamer *
AssociativeTableColumn: goproto.String("album"),
AuditedMedia: newStreamer.Album,
OldMedia: oldStreamer.Album,
}, rollBackFunc))
})
}
return

View File

@ -0,0 +1,149 @@
package service
import (
"context"
"fmt"
"service/api/consts"
"service/api/errcode"
accountproto "service/api/proto/account/proto"
daily_statementproto "service/api/proto/daily_statement/proto"
streamerproto "service/api/proto/streamer/proto"
vasproto "service/api/proto/vas/proto"
"service/bizcommon/util"
"service/dbstruct"
"service/library/contentaudit/textaudit"
"service/library/logger"
"service/library/redis"
"time"
"github.com/gin-gonic/gin"
"github.com/go-co-op/gocron"
xxl "github.com/xxl-job/xxl-job-executor-go"
goproto "google.golang.org/protobuf/proto"
)
func (s *CronService) ReloadRecommList(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
logger.Info("Refreshing recommendation list cached in redis...")
list, ec := DefaultService.OpGetStreamerList(&gin.Context{}, &streamerproto.OpListReq{
Sort: "-fans",
})
if ec != errcode.ErrCodeAccountRelationSrvOk {
logger.Error("OpGetStreamerList fail, ec: %v", ec)
return fmt.Sprintf("OpGetStreamerList fail, ec: %v", ec)
}
midList := make([]int64, len(list))
for i, streamer := range list {
midList[i] = util.DerefInt64(streamer.Mid)
}
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0)
if err != nil {
logger.Error("Redis cache fail, err: %v", err)
return fmt.Sprintf("Redis cache fail, err: %v", err)
}
logger.Info("Refresh recommendation list cached in redis accomplished...")
return "Refresh recommendation list cached in redis accomplished"
}
func (s *CronService) ClearVeriCodeSendTimes(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
logger.Info("Clearing vericode_send_times collection...")
if err := _DefaultVeriCodeSendTimes.OpClear(&gin.Context{}); err != nil {
logger.Error("Clear vericode_send_times collection fail: %v", err)
return fmt.Sprintf("Clear vericode_send_times collection fail: %v", err)
}
logger.Info("vericode_send_times collection has been cleared")
return "vericode_send_times collection has been cleared"
}
// 统计每日报表
func (s *CronService) CreateDailyStatement(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
//拿到现在的时间戳
nowTimeStamp := util.GetHourStartTimeStamp(time.Now())
//获取上个小时时间段
startTimeStamp := nowTimeStamp - int64(60*60)
endTimeStamp := nowTimeStamp - int64(1)
starttime := time.Unix(startTimeStamp, 0)
endtime := time.Unix(endTimeStamp, 0)
//获取H5接口访问量
logpath := fmt.Sprintf("%v_%02d%02d%02d.Global", s.fileAbsPath, starttime.Year(), starttime.Month(), starttime.Day())
count, err := DefaultScriptsService.QueryCallCount(consts.H5CallUrl, logpath)
if err != nil {
logger.Error("query h5 call count fail : %v", err)
return fmt.Sprintf("query h5 call count fail : %v", err)
}
//获取用户总量
accountCount, err := _DefaultAccount.OpCount(&gin.Context{}, &accountproto.OpCountReq{
CtLowerBound: goproto.Int64(int64(consts.AppEnterProductionTime)),
})
if err != nil {
logger.Error("_DefaultAccount OpCount fail : %v", err)
return fmt.Sprintf("_DefaultAccount OpCount fail : %v", err)
}
//获取订单总量
orderCounts, err := _DefaultVas.GetOrderCountGroupByStatus(&gin.Context{}, &vasproto.GetOrderByStatusReq{
CtStart: goproto.Int64(consts.AppEnterProductionTime),
CtEnd: nil,
})
if err != nil {
logger.Error("_DefaultVas GetOrderCountByStatus fail : %v", err)
return fmt.Sprintf("_DefaultVas GetOrderCountByStatus fail : %v", err)
}
finishedOrderCount := int32(0)
allOrderCount := int32(0)
for _, orderCount := range orderCounts {
if util.DerefInt32(orderCount.OrderStatus) != 0 {
finishedOrderCount += util.DerefInt32(orderCount.Count)
}
allOrderCount += util.DerefInt32(orderCount.Count)
}
dailyStatement := &dbstruct.DailyStatement{
H5CallCount: goproto.Int64(int64(count)),
RegisteredUserCount: goproto.Int64(accountCount),
OrderCreatedCount: goproto.Int64(int64(allOrderCount)),
OrderFinishedCount: goproto.Int64(int64(finishedOrderCount)),
StartTime: goproto.Int64(startTimeStamp),
EndTime: goproto.Int64(endTimeStamp),
}
err = _DefaultDailyStatement.OpCreate(&gin.Context{}, &daily_statementproto.OpCreateReq{
DailyStatement: dailyStatement,
})
if err != nil {
logger.Error("_DefaultDailyStatement OpCreate fail : %v", err)
return fmt.Sprintf("_DefaultDailyStatement OpCreate fail : %v", err)
}
logger.Info("%v - %v statement data has created...", starttime, endtime)
return fmt.Sprintf("%v - %v statement data has created...", starttime, endtime)
}
func (s *CronService) ImageAuditBatch() {
scheduler := gocron.NewScheduler(time.UTC)
scheduler.Every(10).Second().Do(func() {
// imageaudit.GiveNoticeToBatch()
})
scheduler.StartAsync()
}
// func (s *CronService) ImageAuditBatch() {
// scheduler := gocron.NewScheduler(time.UTC)
// scheduler.Every(10).Second().Do(func() {
// imageaudit.GiveNoticeToBatch()
// })
// scheduler.StartAsync()
// }
func (s *CronService) TextAuditBatch() {
scheduler := gocron.NewScheduler(time.UTC)
scheduler.Every(10).Second().Do(func() {
textaudit.GiveNoticeToBatch()
})
scheduler.StartAsync()
}

View File

@ -90,4 +90,11 @@ redis:
prefix: ""
server_info:
file_server_domain_name: "https://file.wishpaldev.tech/"
file_server_domain_name: "https://file.wishpaldev.tech/"
xxl_job:
server_addr: "http://127.0.0.1:9800/xxl-job-admin"
access_token: "default_token"
executor_ip: "127.0.0.1"
executor_port: "9801"
registry_key: "golang-jobs-executor"

View File

@ -134,6 +134,15 @@ type ServerInfoConfig struct {
FileServerDomainName string `json:"file_server_domain_name" yaml:"file_server_domain_name"`
}
// xxl-job服务配置
type XxlJobConfig struct {
ServerAddr string `json:"server_addr" yaml:"server_addr"`
AccessToken string `json:"access_token" yaml:"access_token"`
ExecutorIp string `json:"executor_ip" yaml:"executor_ip"`
ExecutorPort string `json:"executor_port" yaml:"executor_port"`
RegistryKey string `json:"registry_key" yaml:"registry_key"`
}
func LoadConfig(configFilePath string, cfg interface{}) error {
cfgStr, err := ioutil.ReadFile(configFilePath)
if err != nil {

View File

@ -69,15 +69,15 @@ func Init(cfg *configcenter.ImageAuditConfig) (err error) {
// 初始化调度器
initScheduler(cfg)
// 启动调度任务
defaultImageAuditTaskScheduler.Run()
// // 启动调度任务
// defaultImageAuditTaskScheduler.Run()
return
}
func GiveNoticeToBatch() {
defaultImageAuditTaskScheduler.GiveNoticeToBatch()
}
// func GiveNoticeToBatch() {
// defaultImageAuditTaskScheduler.GiveNoticeToBatch()
// }
func ConnectToImageAuditService(serivce ImageAuditService) {
_DefaultImageAudit = serivce

View File

@ -0,0 +1,59 @@
package imageaudit
import (
"fmt"
"service/api/consts"
"service/bizcommon/util"
"service/dbstruct"
goproto "google.golang.org/protobuf/proto"
)
// 图像审核任务控制块
// ActionId设计初衷由于图像审核是定时任务触发的批量作业如果在一次作业间隔有针对同一个图像媒体的多次更新则会提交关于它的多次审核需要配合乐观锁保证数据一致性
type ImageAuditTaskControlBlock struct {
// 静态元素
ActionId string // 审核动作id号由图像审核实体数据库四要素拼接而成用于指示对数据库-表-单条数据-图像字段的审核动作
ImageAuditTask *dbstruct.ImageAuditTask // 审核任务
RollbackFunc func() error // 审核失败时的回退方法
// 操作对象
Images []*dbstruct.MediaComponent // 被审核的图像序列
// 动态元素
IsTaskPassed bool // 任务状态,仅当任务已确定完成,即已完成审核任务的分片数 = 审核任务分片数时,才有意义
AuditedFragmentsNum int // 已完成审核任务的分片数
IsGivingNoticeToBatch bool // 是否进行批处理
}
// 新建图像审核任务块
func NewImageAuditTaskControlBlock(task *dbstruct.ImageAuditTask, rollbackFunc func() error) (tcb *ImageAuditTaskControlBlock) {
if task == nil || task.AuditedMedia == nil {
return
}
fragmentNum := len(util.DerefInt64Slice(task.AuditedMedia.ImageIds))
if fragmentNum == 0 {
return
}
task.BatchId = goproto.String(defaultImageAuditTaskScheduler.batchId)
task.Status = goproto.Int64(consts.ImageAudit_Created)
if fragmentNum == 1 {
task.IsFragmented = goproto.Int64(0)
task.FragmentsNum = goproto.Int64(1)
} else {
task.IsFragmented = goproto.Int64(1)
task.FragmentsNum = goproto.Int64(int64(fragmentNum))
}
tcb = &ImageAuditTaskControlBlock{
ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName),
util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)),
ImageAuditTask: task,
RollbackFunc: rollbackFunc,
IsTaskPassed: true,
AuditedFragmentsNum: 0,
IsGivingNoticeToBatch: false,
}
return
}

View File

@ -19,6 +19,20 @@ import (
"github.com/gin-gonic/gin"
)
// 图像审核主逻辑
func Run() (err error) {
// 更新batchId
batchId := defaultImageAuditTaskScheduler.batchId
defaultImageAuditTaskScheduler.batchId = genereteBatchId()
// 查询该批次所有审核任务
imageaudittasks, err := _DefaultImageAuditTask.OpList(&gin.Context{}, &imageaudittaskproto.OpListReq{
BatchId: goproto.String(batchId),
Status: goproto.Int64(consts.ImageAudit_Created),
})
}
// 图像审核主逻辑
func executeImageAuditTasks(tasks []*ImageAuditTaskControlBlock, batchId string) (err error) {

View File

@ -1,7 +1,6 @@
package imageaudit
import (
"fmt"
"service/api/consts"
imageauditproto "service/api/proto/imageaudit/proto"
imageaudittaskproto "service/api/proto/imageaudittask/proto"
@ -13,86 +12,49 @@ import (
goproto "google.golang.org/protobuf/proto"
)
// 图像审核任务控制块
// ActionId设计初衷由于图像审核是定时任务触发的批量作业如果在一次作业间隔有针对同一个图像媒体的多次更新则会提交关于它的多次审核需要配合乐观锁保证数据一致性
type ImageAuditTaskControlBlock struct {
// 静态元素
ActionId string // 审核动作id号由图像审核实体数据库四要素拼接而成用于指示对数据库-表-单条数据-图像字段的审核动作
ImageAuditTask *dbstruct.ImageAuditTask // 审核任务
RollbackFunc func() error // 审核失败时的回退方法
// 操作对象
Images []*dbstruct.MediaComponent // 被审核的图像序列
// 动态元素
IsTaskPassed bool // 任务状态,仅当任务已确定完成,即已完成审核任务的分片数 = 审核任务分片数时,才有意义
AuditedFragmentsNum int // 已完成审核任务的分片数
IsGivingNoticeToBatch bool // 是否进行批处理
func AddTasks(tasks []*dbstruct.ImageAuditTask) error {
for _, task := range tasks {
err := AddTask(task)
if err != nil {
return err
}
}
return nil
}
// 新建图像审核任务块
func NewImageAuditTaskControlBlock(task *dbstruct.ImageAuditTask, rollbackFunc func() error) (tcb *ImageAuditTaskControlBlock) {
func AddTask(task *dbstruct.ImageAuditTask) error {
if task == nil || task.AuditedMedia == nil {
return
return nil
}
fragmentNum := len(util.DerefInt64Slice(task.AuditedMedia.ImageIds))
if fragmentNum == 0 {
return
return nil
}
task.BatchId = goproto.String(defaultImageAuditTaskScheduler.batchId)
task.Status = goproto.Int64(consts.ImageAudit_Created)
if fragmentNum == 1 {
task.IsFragmented = goproto.Int64(0)
task.FragmentsNum = goproto.Int64(1)
// 写入图像审核表
if err := prepareFragmentedImageAuditTask(task); err != nil {
return err
}
} else {
task.IsFragmented = goproto.Int64(1)
task.FragmentsNum = goproto.Int64(int64(fragmentNum))
}
tcb = &ImageAuditTaskControlBlock{
ActionId: fmt.Sprintf("%v%v%v%v", util.DerefString(task.AssociativeDatabase), util.DerefString(task.AssociativeTableName),
util.DerefInt64(task.AssociativeTableId), util.DerefString(task.AssociativeTableColumn)),
ImageAuditTask: task,
RollbackFunc: rollbackFunc,
IsTaskPassed: true,
AuditedFragmentsNum: 0,
IsGivingNoticeToBatch: false,
}
return
}
func AddTask(task *ImageAuditTaskControlBlock) {
if err := task.Prepare(); err != nil {
logger.Error("ImageAuditTaskPrepareException: %v", err)
return
}
defaultImageAuditTaskScheduler.AddTask(task)
}
// 将图像审核任务信息写入数据库,准备送审
func (task *ImageAuditTaskControlBlock) Prepare() (err error) {
if task == nil {
return fmt.Errorf("task is nil, check your input")
}
// 1.写入图像审核表
if util.DerefInt64(task.ImageAuditTask.IsFragmented) == 1 {
if task.Images, err = prepareFragmentedImageAuditTask(task.ImageAuditTask); err != nil {
// 写入图像审核表
if err := prepareNotFragmentedImageAuditTask(task); err != nil {
return err
}
} else {
if err := prepareNotFragmentedImageAuditTask(task.ImageAuditTask); err != nil {
return err
}
task.Images = make([]*dbstruct.MediaComponent, 1)
task.Images[0] = task.ImageAuditTask.AuditedMedia
}
// 2.写入图像审核任务表
// 写入图像审核任务表
if err := _DefaultImageAuditTask.OpCreate(&gin.Context{}, &imageaudittaskproto.OpCreateReq{
ImageAuditTask: task.ImageAuditTask,
ImageAuditTask: task,
}); err != nil {
logger.Error("Imageaudittask OpCreate failed: %v", err)
return err
@ -102,21 +64,20 @@ func (task *ImageAuditTaskControlBlock) Prepare() (err error) {
}
// 根据任务信息创建单个图片的图片审核并存入数据库返回images
func prepareFragmentedImageAuditTask(task *dbstruct.ImageAuditTask) (images []*dbstruct.MediaComponent, err error) {
func prepareFragmentedImageAuditTask(task *dbstruct.ImageAuditTask) (err error) {
ctx := &gin.Context{}
fragmentsNum := util.DerefInt64(task.FragmentsNum)
images = make([]*dbstruct.MediaComponent, fragmentsNum)
imageAudits := make([]*dbstruct.ImageAudit, fragmentsNum)
// 将媒体拆分为单个图像并存入图片审核表
imageIds := util.DerefInt64Slice(task.AuditedMedia.ImageIds)
for i, imageId := range imageIds {
images[i] = &dbstruct.MediaComponent{
image := &dbstruct.MediaComponent{
ImageIds: util.Int64Slice([]int64{imageId}),
}
imageAudits[i] = &dbstruct.ImageAudit{
AuditedMedia: images[i],
AuditedMedia: image,
BatchId: task.BatchId,
Status: goproto.Int64(consts.ImageAudit_Created),
}

View File

@ -3,7 +3,6 @@ package imageaudit
import (
"fmt"
"service/library/configcenter"
"service/library/logger"
"time"
)
@ -11,12 +10,8 @@ var defaultImageAuditTaskScheduler *ImageAuditTaskScheduler
// 图像审核任务调度器
type ImageAuditTaskScheduler struct {
// 缓冲池、同步标志
taskBuffer chan *ImageAuditTaskControlBlock // 缓存待处理任务的缓冲池
batchFlag chan bool // 批处理同步标志
// 数据存储
taskPacket []*ImageAuditTaskControlBlock // 一次批处理的任务包
// // 同步标志
batchFlag chan bool // 批处理同步标志
// 状态记录
batchId string // 当前批次号
@ -24,26 +19,14 @@ type ImageAuditTaskScheduler struct {
func initScheduler(cfg *configcenter.ImageAuditConfig) {
defaultImageAuditTaskScheduler = &ImageAuditTaskScheduler{
taskBuffer: make(chan *ImageAuditTaskControlBlock, cfg.TaskBufferSize),
batchFlag: make(chan bool, 1),
taskPacket: make([]*ImageAuditTaskControlBlock, 0),
batchId: genereteBatchId(),
// taskBuffer: make(chan *ImageAuditTaskControlBlock, cfg.TaskBufferSize),
batchFlag: make(chan bool, 1),
// taskPacket: make([]*ImageAuditTaskControlBlock, 0),
batchId: genereteBatchId(),
}
defaultImageAuditTaskScheduler.batchFlag <- true
}
// 将任务加入缓冲池
func (s *ImageAuditTaskScheduler) AddTask(task *ImageAuditTaskControlBlock) {
s.taskBuffer <- task
}
// 通知进行批处理
func (s *ImageAuditTaskScheduler) GiveNoticeToBatch() {
s.taskBuffer <- &ImageAuditTaskControlBlock{
IsGivingNoticeToBatch: true,
}
}
// 批处理上锁
func (s *ImageAuditTaskScheduler) lock() {
<-s.batchFlag
@ -54,46 +37,46 @@ func (s *ImageAuditTaskScheduler) unLock() {
s.batchFlag <- true
}
// 调度方法
func (s *ImageAuditTaskScheduler) Run() {
go func() {
// 1.循环取出缓冲池中的任务
// 2.如果任务标志进行批处理,则将任务包打包,进行审核作业,最后清空任务包,生成新的批次号
// 3.否则,将任务加入任务包中,并为任务建立索引,记录任务执行顺序
for {
task := <-s.taskBuffer
if task.IsGivingNoticeToBatch {
// 上锁
s.lock()
// // 调度方法
// func (s *ImageAuditTaskScheduler) Run() {
// go func() {
// // 1.循环取出缓冲池中的任务
// // 2.如果任务标志进行批处理,则将任务包打包,进行审核作业,最后清空任务包,生成新的批次号
// // 3.否则,将任务加入任务包中,并为任务建立索引,记录任务执行顺序
// for {
// task := <-s.taskBuffer
// if task.IsGivingNoticeToBatch {
// // 上锁
// s.lock()
// 打包任务
packet := s.taskPacket
s.taskPacket = make([]*ImageAuditTaskControlBlock, 0)
// // 打包任务
// packet := s.taskPacket
// s.taskPacket = make([]*ImageAuditTaskControlBlock, 0)
// 取出状态并重置
batchId := s.batchId
s.batchId = genereteBatchId()
// 执行审核作业
go func() {
err := executeImageAuditTasks(packet, batchId)
if err != nil {
logger.Error("Batch failed : %v", err)
}
// 解锁
s.unLock()
}()
// // 取出状态并重置
// batchId := s.batchId
// s.batchId = genereteBatchId()
// // 执行审核作业
// go func() {
// err := executeImageAuditTasks(packet, batchId)
// if err != nil {
// logger.Error("Batch failed : %v", err)
// }
// // 解锁
// s.unLock()
// }()
} else {
s.taskPacket = append(s.taskPacket, task)
}
}
}()
}
// } else {
// s.taskPacket = append(s.taskPacket, task)
// }
// }
// }()
// }
// 生成批次号
func genereteBatchId() string {
now := time.Now()
y, m, d := now.Date()
h, mi, s := now.Clock()
return fmt.Sprintf("%d%d%d%d%d%02d", y, m, d, h, mi, s)
return fmt.Sprintf("%d%2d%2d%2d%2d%02d", y, m, d, h, mi, s)
}

View File

@ -9,6 +9,7 @@ import (
"service/library/logger"
"service/library/mycrypto"
"testing"
"time"
)
func Test(t *testing.T) {
@ -25,20 +26,20 @@ func Test(t *testing.T) {
logger.Error("cryptoService init, err: %v", err)
}
mobilePhone := "Loum3xCiIgJ/tnp+AdxbIw=="
base64DecryptedBytes, _ := base64.StdEncoding.DecodeString(mobilePhone)
phone, _ := mycrypto.CryptoServiceInstance().AES.Decrypt(base64DecryptedBytes)
fmt.Println(string(phone))
hash := mycrypto.CryptoServiceInstance().SHA256.Encrypt(phone)
fmt.Println(string(hash))
// mobilePhone := "17738729985"
// rsaBytes, _ := mycrypto.CryptoServiceInstance().RSA.Encrypt([]byte(mobilePhone))
// base64EncryptedBytes := make([]byte, base64.StdEncoding.EncodedLen(len(rsaBytes)))
// base64.StdEncoding.Encode(base64EncryptedBytes, rsaBytes)
// phoneHash := mycrypto.CryptoServiceInstance().SHA256.Encrypt([]byte(mobilePhone))
// mobilePhone := "Loum3xCiIgJ/tnp+AdxbIw=="
// base64DecryptedBytes, _ := base64.StdEncoding.DecodeString(mobilePhone)
// phone, _ := mycrypto.CryptoServiceInstance().AES.Decrypt(base64DecryptedBytes)
// fmt.Println(string(phone))
// hash := mycrypto.CryptoServiceInstance().SHA256.Encrypt(phone)
// fmt.Println(string(hash))
mobilePhone := "19900000009"
rsaBytes, _ := mycrypto.CryptoServiceInstance().RSA.Encrypt([]byte(mobilePhone))
base64EncryptedBytes := make([]byte, base64.StdEncoding.EncodedLen(len(rsaBytes)))
base64.StdEncoding.Encode(base64EncryptedBytes, rsaBytes)
phoneHash := mycrypto.CryptoServiceInstance().SHA256.Encrypt([]byte(mobilePhone))
// fmt.Printf("Time:%v\n", time.Now().Unix())
// fmt.Printf("RSA:%v\n", string(base64EncryptedBytes))
// fmt.Printf("PhoneHash:%v\n", phoneHash)
fmt.Printf("Time:%v\n", time.Now().Unix())
fmt.Printf("RSA:%v\n", string(base64EncryptedBytes))
fmt.Printf("PhoneHash:%v\n", phoneHash)
}

View File

@ -288,7 +288,6 @@ func (e *executor) registry() {
e.log.Error("执行器注册失败3:" + string(body))
return
}
e.log.Info("执行器注册成功:" + string(body))
}()
}