service/app/mix/service/xxljob_tasks.go

300 lines
12 KiB
Go

package service
import (
"context"
"fmt"
"service/api/consts"
"service/api/errcode"
accountproto "service/api/proto/account/proto"
contact_customer_service_proto "service/api/proto/contact_customer_service/proto"
daily_statementproto "service/api/proto/daily_statement/proto"
streamerproto "service/api/proto/streamer/proto"
vasproto "service/api/proto/vas/proto"
"service/bizcommon/util"
"service/dbstruct"
"service/library/contentaudit/imageaudit"
"service/library/contentaudit/textaudit"
"service/library/dingtalk"
"service/library/logger"
"service/library/redis"
"strings"
"time"
"github.com/gin-gonic/gin"
xxl "github.com/xxl-job/xxl-job-executor-go"
goproto "google.golang.org/protobuf/proto"
)
func (s *CronService) ReloadRecommList(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
logger.Info("Refreshing recommendation list cached in redis...")
list, ec := DefaultService.OpGetStreamerList(&gin.Context{}, &streamerproto.OpListReq{
Sort: "-fans",
})
if ec != errcode.ErrCodeAccountRelationSrvOk {
logger.Error("OpGetStreamerList fail, ec: %v", ec)
return fmt.Sprintf("OpGetStreamerList fail, ec: %v", ec)
}
midList := make([]int64, len(list))
for i, streamer := range list {
midList[i] = util.DerefInt64(streamer.Mid)
}
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0)
if err != nil {
logger.Error("Redis cache fail, err: %v", err)
return fmt.Sprintf("Redis cache fail, err: %v", err)
}
logger.Info("Refresh recommendation list cached in redis accomplished...")
return "Refresh recommendation list cached in redis accomplished"
}
func (s *CronService) ClearVeriCodeSendTimes(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
logger.Info("Clearing vericode_send_times collection...")
if err := _DefaultVeriCodeSendTimes.OpClear(&gin.Context{}); err != nil {
logger.Error("Clear vericode_send_times collection fail: %v", err)
return fmt.Sprintf("Clear vericode_send_times collection fail: %v", err)
}
logger.Info("vericode_send_times collection has been cleared")
return "vericode_send_times collection has been cleared"
}
func (s *CronService) ClearMomentCreateTimes(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
logger.Info("Clearing moment_create_times collection...")
if err := _DefaultMomentCreateTimes.OpClear(&gin.Context{}); err != nil {
logger.Error("Clear moment_create_times collection fail: %v", err)
return fmt.Sprintf("Clear moment_create_times collection fail: %v", err)
}
logger.Info("moment_create_times collection has been cleared")
return "moment_create_times collection has been cleared"
}
// 统计每日报表
func (s *CronService) CreateDailyStatement(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
//拿到现在的时间戳
nowTimeStamp := util.GetHourStartTimeStamp(time.Now())
//获取上个小时时间段
startTimeStamp := nowTimeStamp - int64(60*60)
endTimeStamp := nowTimeStamp - int64(1)
starttime := time.Unix(startTimeStamp, 0)
endtime := time.Unix(endTimeStamp, 0)
//获取H5接口访问量
logpath := fmt.Sprintf("%v_%02d%02d%02d.Global", s.fileAbsPath, starttime.Year(), starttime.Month(), starttime.Day())
count, err := DefaultScriptsService.QueryCallCount(consts.H5CallUrl, logpath)
if err != nil {
logger.Error("query h5 call count fail : %v", err)
return fmt.Sprintf("query h5 call count fail : %v", err)
}
//获取用户总量
accountCount, err := _DefaultAccount.OpCount(&gin.Context{}, &accountproto.OpCountReq{
CtLowerBound: goproto.Int64(int64(consts.AppEnterProductionTime)),
})
if err != nil {
logger.Error("_DefaultAccount OpCount fail : %v", err)
return fmt.Sprintf("_DefaultAccount OpCount fail : %v", err)
}
//获取订单总量
orderCounts, err := _DefaultVas.GetOrderCountGroupByStatus(&gin.Context{}, &vasproto.GetOrderByStatusReq{
CtStart: goproto.Int64(consts.AppEnterProductionTime),
CtEnd: nil,
})
if err != nil {
logger.Error("_DefaultVas GetOrderCountByStatus fail : %v", err)
return fmt.Sprintf("_DefaultVas GetOrderCountByStatus fail : %v", err)
}
//获取用户注册来源
androidCount, iosCount, h5Count, err := _DefaultAccount.OpCountLastHourNewUserSource(&gin.Context{}, &accountproto.OpCountReq{
CtLowerBound: goproto.Int64(startTimeStamp),
CtUpperBound: goproto.Int64(endTimeStamp),
})
if err != nil {
logger.Error("_DefaultAccount OpCountLastHourNewUserSource fail : %v", err)
return fmt.Sprintf("_DefaultAccount OpCountLastHourNewUserSource fail : %v", err)
}
finishedOrderCount := int32(0)
allOrderCount := int32(0)
for _, orderCount := range orderCounts {
if util.DerefInt32(orderCount.OrderStatus) != 0 {
finishedOrderCount += util.DerefInt32(orderCount.Count)
}
allOrderCount += util.DerefInt32(orderCount.Count)
}
dailyStatement := &dbstruct.DailyStatement{
H5CallCount: goproto.Int64(int64(count)),
RegisteredUserCount: goproto.Int64(accountCount),
OrderCreatedCount: goproto.Int64(int64(allOrderCount)),
OrderFinishedCount: goproto.Int64(int64(finishedOrderCount)),
LastHourNewUserCountFromAndroid: goproto.Int64(androidCount),
LastHourNewUserCountFromIos: goproto.Int64(iosCount),
LastHourNewUserCountFromH5: goproto.Int64(h5Count),
StartTime: goproto.Int64(startTimeStamp),
EndTime: goproto.Int64(endTimeStamp),
}
err = _DefaultDailyStatement.OpCreate(&gin.Context{}, &daily_statementproto.OpCreateReq{
DailyStatement: dailyStatement,
})
if err != nil {
logger.Error("_DefaultDailyStatement OpCreate fail : %v", err)
return fmt.Sprintf("_DefaultDailyStatement OpCreate fail : %v", err)
}
logger.Info("%v - %v statement data has created...", starttime, endtime)
return fmt.Sprintf("%v - %v statement data has created...", starttime, endtime)
}
// 图像审核作业
func (s *CronService) ImageAuditBatch(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
handleMsg := &strings.Builder{}
// 刷新批次号
batchId := imageaudit.RefreshBatchId()
// 执行图像审核
successNum, failNum, err := imageaudit.Run(batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err))
} else {
handleMsg.WriteString(fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
}
// 同步图像审核结果
err = DefaultService.utilSyncImageAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed to syncronize, err :%v", batchId, err))
}
logger.Info("Image audit batch ends...")
return handleMsg.String()
}
func (s *CronService) ImageAuditBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
handleMsg := &strings.Builder{}
// 刷新批次号
batchId := param.ExecutorParams
// 执行图像审核
successNum, failNum, err := imageaudit.Run(batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v ; ", batchId, successNum, failNum, err))
} else {
handleMsg.WriteString(fmt.Sprintf("batchId : %v, image audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
}
// 同步图像审核结果
err = DefaultService.utilSyncImageAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, image audit tasks of this batchId have failed to syncronize, err :%v", batchId, err))
}
logger.Info("Image audit batch ends...")
return handleMsg.String()
}
func (s *CronService) TextAuditBatch(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
handleMsg := &strings.Builder{}
// 刷新批次号
batchId := textaudit.RefreshBatchId()
// 执行文字审核
successNum, failNum, err := textaudit.Run(batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err))
} else {
handleMsg.WriteString(fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
}
// 同步文字审核结果
err = DefaultService.utilSyncTextAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed to syncronize, err :%v", batchId, err))
}
logger.Info("Text audit batch ends...")
return handleMsg.String()
}
func (s *CronService) TextAuditBatchHis(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
handleMsg := &strings.Builder{}
// 刷新批次号
batchId := param.ExecutorParams
// 执行文字审核
successNum, failNum, err := textaudit.Run(batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed, successNum: %v, failNum: %v, err :%v", batchId, successNum, failNum, err))
}
// 同步文字审核结果
err = DefaultService.utilSyncTextAuditTaskResultByBatchId(&gin.Context{}, batchId)
if err != nil {
handleMsg.WriteString(fmt.Sprintf("ERROR : batchId : %v, text audit tasks of this batchId have failed to syncronize, err :%v", batchId, err))
} else {
handleMsg.WriteString(fmt.Sprintf("batchId : %v, text audit tasks of this batchId have finished, successNum: %v, failNum: %v", batchId, successNum, failNum))
}
logger.Info("Text audit batch ends...")
return handleMsg.String()
}
// 每天0点定时清空图像和文字审核作业的日志
func (s *CronService) ClearContentAuditBatchExecutionLogs(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
inParams := strings.Split(param.ExecutorParams, "|")
if len(inParams) < 2 {
return "Wrong executor params format!"
}
jobIdsStr := inParams[0]
errorPrefix := inParams[1]
err := _DefaultXxlJob.ClearSuccXxlJobLogs(&gin.Context{}, jobIdsStr, errorPrefix)
if err != nil {
return err.Error()
}
return "clear content audit batch execution logs finished..."
}
// 每分钟查询反馈信息发送到钉钉机器人
func (s *CronService) SendContactCustomerServicesOfLastMinute(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
//拿到现在的时间戳
nowTimeStamp := util.GetMinuteStartTimeStamp(time.Now())
//获取上分钟时间段
startTimeStamp := nowTimeStamp - int64(60)
endTimeStamp := nowTimeStamp - int64(1)
list, err := _DefaultContactCustomerService.OpList(&gin.Context{}, &contact_customer_service_proto.OpListReq{
CtLowerBound: goproto.Int64(startTimeStamp),
CtUpperBound: goproto.Int64(endTimeStamp),
})
if err != nil {
return err.Error()
}
if len(list) == 0 {
return "No new contact customer service created"
}
msg, err = DefaultService.utilStringifyContactCustomerServices(&gin.Context{}, list)
if err != nil {
return err.Error()
}
err = dingtalk.SendMsg(msg)
if err != nil {
return err.Error()
}
return "Message send success"
}