Merge pull request 'mainconflict' (#195) from mainconflict into main
Reviewed-on: http://121.41.31.146:3000/wishpal_ironfan/service/pulls/195
This commit is contained in:
commit
c9c66aba94
|
@ -15,6 +15,12 @@ const (
|
|||
DefaultPageSize = 10 //默认页长为10
|
||||
)
|
||||
|
||||
// 推荐每次吞吐量
|
||||
const (
|
||||
StreamerRecommThroughput = 4
|
||||
MomentRecommThroughput = 4
|
||||
)
|
||||
|
||||
// 设备类型
|
||||
const (
|
||||
DevType_Android = 0
|
||||
|
@ -68,9 +74,10 @@ const (
|
|||
// redis键前缀
|
||||
const (
|
||||
RedisStreamerPrefix = "streamer:" //streamer服务前缀
|
||||
RedisMomentPrefix = "moment:" //moment服务前缀
|
||||
)
|
||||
|
||||
//const PackageRootPath = "C:/Users/PC/Desktop/wishpal_ironfan_service/service"
|
||||
//const PackageRootPath = "C:/Users/PC/Desktop/service"
|
||||
|
||||
const PackageRootPath = "/app/wishpal-ironfan"
|
||||
|
||||
|
@ -78,7 +85,7 @@ const MainConfigPath = PackageRootPath + "/etc/mix/mix-test.yaml"
|
|||
|
||||
const ProductionConfigPath = PackageRootPath + "/etc/mix/mix-prod.yaml"
|
||||
|
||||
const LocalConfigPath = "/Users/erwin/wishpal/wishpal-ironfan/etc/mix/mix-local.yaml"
|
||||
const LocalConfigPath = "C:/Users/PC/Desktop/service/etc/mix/mix-local.yaml"
|
||||
|
||||
const ReservedUserIdRegexesConfig = PackageRootPath + "/etc/mix/resource/reg_reserved_user_id_config.xml"
|
||||
|
||||
|
|
|
@ -13,3 +13,9 @@ const (
|
|||
const (
|
||||
AccountPunishment_BlockFromCreatingMoment = 0 // 禁止发贴
|
||||
)
|
||||
|
||||
const (
|
||||
Recomm_Down = 0
|
||||
Recomm_Up = 1
|
||||
Recomm_Init = 2
|
||||
)
|
||||
|
|
|
@ -166,6 +166,9 @@ var ErrCodeMsgMap = map[ErrCode]string{
|
|||
|
||||
ErrCodeAccountCancellationSrvFail: "账户注销服务错误",
|
||||
ErrCodeAccountCancellationNotExist: "账户注销不存在",
|
||||
|
||||
ErrCodeUserVisitOffsetSrvFail: "用户游标表服务错误",
|
||||
ErrCodeUserVisitOffsetNotExist: "用户游标表不存在",
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -380,6 +383,16 @@ const (
|
|||
ErrCodeMomentAuditTaskSrvFail ErrCode = -29001 // 动态审核任务表服务错误
|
||||
ErrCodeMomentAuditTaskNotExist ErrCode = -29002 // 动态审核任务表不存在
|
||||
|
||||
// AccountCancellation: 30xxx
|
||||
ErrCodeAccountCancellationSrvOk ErrCode = ErrCodeOk
|
||||
ErrCodeAccountCancellationSrvFail ErrCode = -30001 // 账户注销服务错误
|
||||
ErrCodeAccountCancellationNotExist ErrCode = -30002 // 账户注销不存在
|
||||
|
||||
// UserVisitOffset: 30xxx
|
||||
ErrCodeUserVisitOffsetSrvOk ErrCode = ErrCodeOk
|
||||
ErrCodeUserVisitOffsetSrvFail ErrCode = -31001 // 用户游标表服务错误
|
||||
ErrCodeUserVisitOffsetNotExist ErrCode = -31002 // 用户游标表不存在
|
||||
|
||||
// AccountPunishment: 32xxx
|
||||
ErrCodeAccountPunishmentSrvOk ErrCode = ErrCodeOk
|
||||
ErrCodeAccountPunishmentSrvFail ErrCode = -32001 // 账号处罚服务错误
|
||||
|
@ -390,11 +403,6 @@ const (
|
|||
ErrCodeAccountPunishmentHasBeenInterrupted ErrCode = -32006 // 账号处罚已提前中止
|
||||
ErrCodeAccountPunishmentStreamerOnly ErrCode = -32007 // 该账号处罚仅能对主播进行
|
||||
|
||||
// AccountCancellation: 30xxx
|
||||
ErrCodeAccountCancellationSrvOk ErrCode = ErrCodeOk
|
||||
ErrCodeAccountCancellationSrvFail ErrCode = -30001 // 账户注销服务错误
|
||||
ErrCodeAccountCancellationNotExist ErrCode = -30002 // 账户注销不存在
|
||||
|
||||
// Media: 60xxx
|
||||
ErrCodeMediaSrvOk ErrCode = ErrCodeOk
|
||||
ErrCodeMediaSrvFail ErrCode = -60001 // 媒体服务错误
|
||||
|
|
|
@ -140,3 +140,18 @@ type ApiListByIdsResp struct {
|
|||
base.BaseResponse
|
||||
Data *ApiListByIdsData `json:"data"`
|
||||
}
|
||||
|
||||
// api 推荐
|
||||
type ApiRecommListReq struct {
|
||||
base.BaseRequest
|
||||
OpType int64 `json:"op_type"`
|
||||
}
|
||||
|
||||
type ApiRecommListData struct {
|
||||
RecommList []*ApiMomentVO `json:"recomm_list"`
|
||||
}
|
||||
|
||||
type ApiRecommListResp struct {
|
||||
base.BaseResponse
|
||||
Data *ApiRecommListData `json:"data"`
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ type ApiListReq struct {
|
|||
base.BaseRequest
|
||||
Offset int `json:"offset"`
|
||||
Limit int `json:"limit"`
|
||||
Sort string
|
||||
Sort []string
|
||||
}
|
||||
|
||||
type ApiListData struct {
|
||||
|
@ -180,12 +180,11 @@ type ApiListStreamerWxIdResp struct {
|
|||
// api 推荐
|
||||
type ApiRecommListReq struct {
|
||||
base.BaseRequest
|
||||
Offset int `json:"offset"`
|
||||
Limit int `json:"limit"`
|
||||
OpType int64 `json:"op_type"`
|
||||
}
|
||||
|
||||
type ApiRecommListData struct {
|
||||
RecommList []int64 `json:"recomm_list"`
|
||||
RecommList []*ApiListExtVO `json:"recomm_list"`
|
||||
}
|
||||
|
||||
type ApiRecommListResp struct {
|
||||
|
|
|
@ -52,7 +52,7 @@ type OpListReq struct {
|
|||
base.BaseRequest
|
||||
Offset int `json:"offset"`
|
||||
Limit int `json:"limit"`
|
||||
Sort string
|
||||
Sort []string
|
||||
}
|
||||
|
||||
type OpListData struct {
|
||||
|
|
|
@ -107,6 +107,7 @@ func Init(r *gin.Engine) {
|
|||
apiMomentGroup.POST("list_by_mids", middleware.JSONParamValidator(momentproto.ApiListByMidsReq{}), middleware.JwtAuthenticator(), ApiGetMomentListByMids)
|
||||
apiMomentGroup.POST("thumbs_up", middleware.JSONParamValidator(momentproto.ApiThumbsUpReq{}), middleware.JwtAuthenticator(), ApiThumbsUpMoment)
|
||||
apiMomentGroup.POST("list_by_ids", middleware.JSONParamValidator(momentproto.ApiListByIdsReq{}), middleware.JwtAuthenticator(), ApiGetMomentListByIds)
|
||||
apiMomentGroup.POST("recomm_list", middleware.JSONParamValidator(momentproto.ApiRecommListReq{}), middleware.JwtAuthenticator(), ApiGetMomentRecommList)
|
||||
|
||||
// 足迹
|
||||
// apiFootPrintGroup := r.Group("/api/footprint", PrepareToC())
|
||||
|
@ -323,7 +324,7 @@ func Init(r *gin.Engine) {
|
|||
opStreamerGroup.POST("list_ext_fuzzily_by_user_id", middleware.JSONParamValidator(streamerproto.OpListExtFuzzilyByUserIdReq{}), middleware.JwtAuthenticator(), OpGetStreamerExtListFuzzilyByUserId)
|
||||
opStreamerGroup.POST("list_ext_fuzzily_by_name", middleware.JSONParamValidator(streamerproto.OpListExtFuzzilyByNameReq{}), middleware.JwtAuthenticator(), OpGetStreamerExtListFuzzilyByName)
|
||||
opStreamerGroup.POST("list_wx_id", middleware.JSONParamValidator(streamerproto.OpListStreamerWxIdReq{}), middleware.JwtAuthenticator(), OpGetStreamerWxId)
|
||||
opStreamerGroup.POST("recomm_list", middleware.JSONParamValidator(streamerproto.OpRecommListReq{}), middleware.JwtAuthenticator(), OpGetStreamerRecommList)
|
||||
//opStreamerGroup.POST("recomm_list", middleware.JSONParamValidator(streamerproto.OpRecommListReq{}), middleware.JwtAuthenticator(), OpGetStreamerRecommList)
|
||||
|
||||
// 意见反馈
|
||||
opFeedbackGroup := r.Group("/op/feedback", PrepareOp())
|
||||
|
|
|
@ -187,3 +187,25 @@ func ApiThumbsUpMoment(ctx *gin.Context) {
|
|||
|
||||
ReplyOk(ctx, nil)
|
||||
}
|
||||
|
||||
func ApiGetMomentRecommList(ctx *gin.Context) {
|
||||
req := ctx.MustGet("client_req").(*momentproto.ApiRecommListReq)
|
||||
|
||||
list, ec := service.DefaultService.ApiGetMomentRecommList(ctx, req)
|
||||
if ec != errcode.ErrCodeMomentSrvOk {
|
||||
logger.Error("OpGetMomentRecommList fail, req: %v, ec: %v", util.ToJson(req), ec)
|
||||
ReplyErrCodeMsg(ctx, ec)
|
||||
return
|
||||
}
|
||||
|
||||
mediaFillableList := make([]mediafiller.MediaFillable, len(list))
|
||||
for i, media := range list {
|
||||
mediaFillableList[i] = media.MediaComp
|
||||
}
|
||||
mediafiller.FillList(ctx, mediaFillableList)
|
||||
|
||||
data := &momentproto.ApiRecommListData{
|
||||
RecommList: list,
|
||||
}
|
||||
ReplyOk(ctx, data)
|
||||
}
|
||||
|
|
|
@ -223,6 +223,16 @@ func ApiGetStreamerRecommList(ctx *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
objectMediaNum := 4 // 单个主播服务总共4个媒体类
|
||||
mediaFillableList := make([]mediafiller.MediaFillable, len(list)*objectMediaNum)
|
||||
for i, vo := range list {
|
||||
mediaFillableList[objectMediaNum*i+0] = vo.Avatar
|
||||
mediaFillableList[objectMediaNum*i+1] = vo.Cover
|
||||
mediaFillableList[objectMediaNum*i+2] = vo.Shorts
|
||||
mediaFillableList[objectMediaNum*i+3] = vo.Album
|
||||
}
|
||||
mediafiller.FillList(ctx, mediaFillableList)
|
||||
|
||||
data := &streamerproto.ApiRecommListData{
|
||||
RecommList: list,
|
||||
}
|
||||
|
|
|
@ -242,19 +242,3 @@ func OpGetStreamerWxId(ctx *gin.Context) {
|
|||
}
|
||||
ReplyOk(ctx, data)
|
||||
}
|
||||
|
||||
func OpGetStreamerRecommList(ctx *gin.Context) {
|
||||
req := ctx.MustGet("client_req").(*streamerproto.OpRecommListReq)
|
||||
|
||||
list, ec := service.DefaultService.OpGetStreamerRecommList(ctx, req)
|
||||
if ec != errcode.ErrCodeStreamerSrvOk {
|
||||
logger.Error("OpGetStreamerRecommList fail, req: %v, ec: %v", util.ToJson(req), ec)
|
||||
ReplyErrCodeMsg(ctx, ec)
|
||||
return
|
||||
}
|
||||
|
||||
data := &streamerproto.OpRecommListData{
|
||||
RecommList: list,
|
||||
}
|
||||
ReplyOk(ctx, data)
|
||||
}
|
||||
|
|
|
@ -164,6 +164,12 @@ const (
|
|||
|
||||
DBAccountCancellation = "account_cancellation"
|
||||
COLAccountCancellation = "account_cancellation"
|
||||
|
||||
DBUserVisitOffset = "user_visit_offset"
|
||||
COLUserVisitOffset = "user_visit_offset"
|
||||
|
||||
DBUserMomentVisitOffset = "user_moment_visit_offset"
|
||||
COLUserMomentVisitOffset = "user_moment_visit_offset"
|
||||
)
|
||||
|
||||
// 商品表
|
||||
|
@ -390,6 +396,16 @@ func (m *Mongo) getColAccountCancellation() *qmgo.Collection {
|
|||
return m.clientMix.Database(DBAccountCancellation).Collection(COLAccountCancellation)
|
||||
}
|
||||
|
||||
// 用户访问偏移量表
|
||||
func (m *Mongo) getColUserVisitOffset() *qmgo.Collection {
|
||||
return m.clientMix.Database(DBUserVisitOffset).Collection(COLUserVisitOffset)
|
||||
}
|
||||
|
||||
// 用户动态访问偏移量表
|
||||
func (m *Mongo) getColUserMomentVisitOffset() *qmgo.Collection {
|
||||
return m.clientMix.Database(DBUserMomentVisitOffset).Collection(COLUserMomentVisitOffset)
|
||||
}
|
||||
|
||||
// 商品相关
|
||||
func (m *Mongo) CreateProduct(ctx *gin.Context, product *dbstruct.Product) error {
|
||||
col := m.getColProduct()
|
||||
|
@ -2120,10 +2136,10 @@ func (m *Mongo) GetStreamerList(ctx *gin.Context, req *streamerproto.OpListReq)
|
|||
"del_flag": 0,
|
||||
}
|
||||
//排序规则
|
||||
if req.Sort == "" {
|
||||
req.Sort = "-ct"
|
||||
if len(req.Sort) == 0 {
|
||||
req.Sort = append(req.Sort, "-ct")
|
||||
}
|
||||
err := col.Find(ctx, query).Sort(req.Sort).Skip(int64(req.Offset)).Limit(int64(req.Limit)).All(&list)
|
||||
err := col.Find(ctx, query).Sort(req.Sort...).Skip(int64(req.Offset)).Limit(int64(req.Limit)).All(&list)
|
||||
if err == qmgo.ErrNoSuchDocuments {
|
||||
err = nil
|
||||
return list, err
|
||||
|
@ -3609,3 +3625,81 @@ func (m *Mongo) GetAccountCancellationListByMid(ctx *gin.Context, req *account_c
|
|||
}
|
||||
return accountCancellation, err
|
||||
}
|
||||
|
||||
func (m *Mongo) CreateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
|
||||
col := m.getColUserVisitOffset()
|
||||
_, err := col.InsertOne(ctx, uservisitoffset)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *Mongo) UpdateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
|
||||
col := m.getColUserVisitOffset()
|
||||
filter := qmgo.M{
|
||||
"_id": uservisitoffset.Id,
|
||||
"ver": uservisitoffset.Ver,
|
||||
}
|
||||
set := qmgo.M{
|
||||
"streamer_recomm_offset": uservisitoffset.StreamerRecommOffset,
|
||||
}
|
||||
if uservisitoffset.BottomFlag != nil {
|
||||
set["bottom_flag"] = uservisitoffset.GetBottomFlag()
|
||||
}
|
||||
update := qmgo.M{
|
||||
"$set": set,
|
||||
}
|
||||
err := col.UpdateOne(ctx, filter, update)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *Mongo) GetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) {
|
||||
uservisitoffset := &dbstruct.UserVisitOffset{}
|
||||
col := m.getColUserVisitOffset()
|
||||
query := qmgo.M{
|
||||
"_id": mid,
|
||||
}
|
||||
err := col.Find(ctx, query).One(uservisitoffset)
|
||||
if err == qmgo.ErrNoSuchDocuments {
|
||||
err = nil
|
||||
return nil, err
|
||||
}
|
||||
return uservisitoffset, err
|
||||
}
|
||||
|
||||
func (m *Mongo) CreateUserMomentVisitOffset(ctx *gin.Context, usermomentvisitoffset *dbstruct.UserMomentVisitOffset) error {
|
||||
col := m.getColUserMomentVisitOffset()
|
||||
_, err := col.InsertOne(ctx, usermomentvisitoffset)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *Mongo) UpdateUserMomentVisitOffset(ctx *gin.Context, usermomentvisitoffset *dbstruct.UserMomentVisitOffset) error {
|
||||
col := m.getColUserMomentVisitOffset()
|
||||
filter := qmgo.M{
|
||||
"_id": usermomentvisitoffset.Id,
|
||||
"ver": usermomentvisitoffset.Ver,
|
||||
}
|
||||
set := qmgo.M{
|
||||
"moment_recomm_offset": usermomentvisitoffset.MomentRecommOffset,
|
||||
}
|
||||
if usermomentvisitoffset.BottomFlag != nil {
|
||||
set["bottom_flag"] = usermomentvisitoffset.GetBottomFlag()
|
||||
}
|
||||
update := qmgo.M{
|
||||
"$set": set,
|
||||
}
|
||||
err := col.UpdateOne(ctx, filter, update)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *Mongo) GetUserMomentVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserMomentVisitOffset, error) {
|
||||
usermomentvisitoffset := &dbstruct.UserMomentVisitOffset{}
|
||||
col := m.getColUserMomentVisitOffset()
|
||||
query := qmgo.M{
|
||||
"_id": mid,
|
||||
}
|
||||
err := col.Find(ctx, query).One(usermomentvisitoffset)
|
||||
if err == qmgo.ErrNoSuchDocuments {
|
||||
err = nil
|
||||
return nil, err
|
||||
}
|
||||
return usermomentvisitoffset, err
|
||||
}
|
||||
|
|
|
@ -1339,7 +1339,7 @@ func (s *Service) ApiGetStreamerWxId(ctx *gin.Context, req *streamerproto.ApiLis
|
|||
}
|
||||
|
||||
// 推荐
|
||||
func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommlist []int64, ec errcode.ErrCode) {
|
||||
func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommStreamerList []*streamerproto.ApiListExtVO, ec errcode.ErrCode) {
|
||||
|
||||
ec = errcode.ErrCodeStreamerSrvOk
|
||||
|
||||
|
@ -1351,48 +1351,15 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.
|
|||
return nil, errcode.ErrCodeApolloReadFail
|
||||
}
|
||||
|
||||
//1.从redis中获取数据
|
||||
err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist)
|
||||
// 从redis中获取主播列表
|
||||
recommlist, err := s.utilGetStreamerRecommList(ctx)
|
||||
if err != nil {
|
||||
logger.Error("Redis read failed : %v", err)
|
||||
ec = errcode.ErrCodeStreamerRecommListRedisCacheInvalid
|
||||
logger.Error("utilGetStreamerRecommList fail, err: %v", err)
|
||||
ec = errcode.ErrCodeStreamerSrvFail
|
||||
return
|
||||
}
|
||||
|
||||
//2.若redis命中失败,再从数据库查
|
||||
if len(recommlist) == 0 {
|
||||
logger.Error("Redis hit failed, reading recommendation list from mongo...")
|
||||
|
||||
// recommlist, _, err = _DefaultAccountRelation.OpIsFollowedCount(ctx)
|
||||
// if err != nil {
|
||||
// logger.Error("OpIsFollowedCount fail, err: %v", err)
|
||||
// ec = errcode.ErrCodeAccountRelationSrvFail
|
||||
// return
|
||||
// }
|
||||
|
||||
list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{
|
||||
Sort: "-fans",
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("OpList fail, err: %v", err)
|
||||
ec = errcode.ErrCodeAccountRelationSrvFail
|
||||
return
|
||||
}
|
||||
recommlist = make([]int64, len(list))
|
||||
for i, streamer := range list {
|
||||
recommlist[i] = util.DerefInt64(streamer.Mid)
|
||||
}
|
||||
|
||||
//若数据库命中成功,则立即加载进redis
|
||||
if len(recommlist) != 0 {
|
||||
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0)
|
||||
if err != nil {
|
||||
logger.Error("Redis cache fail, err: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// 查询是否限制访客
|
||||
visitorMid := req.BaseRequest.Mid
|
||||
for _, restricted_visitor_mid := range cfg.RestrictedVisitorMids {
|
||||
if restricted_visitor_mid == visitorMid { // 是限制访问的访客
|
||||
|
@ -1411,6 +1378,13 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.
|
|||
}
|
||||
}
|
||||
|
||||
recommStreamerList, err = s.utilGetStreamerRecommListVO(ctx, recommlist, req.Mid, req.OpType)
|
||||
if err != nil {
|
||||
logger.Error("utilGetUpStreamerRecommList fail, err: %v", err)
|
||||
ec = errcode.ErrCodeStreamerSrvFail
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1710,6 +1684,21 @@ func (s *Service) ApiGetContactCustomerServiceSessionListByMid(ctx *gin.Context,
|
|||
func (s *Service) ApiCreateMoment(ctx *gin.Context, req *momentproto.ApiCreateReq) (ec errcode.ErrCode, acctPunEndTime string) {
|
||||
ec = errcode.ErrCodeMomentSrvOk
|
||||
|
||||
// 创建完动态,将动态加载到当前redis数组
|
||||
defer func() {
|
||||
if req.Moment.Id == nil {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
id := util.DerefInt64(req.Moment.Id)
|
||||
err := redis.GetRedisClient().RPush(consts.RedisMomentPrefix+"recent_list", id)
|
||||
if err != nil {
|
||||
logger.Error("Push newly-created moment to list failed : %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
}()
|
||||
|
||||
req.Moment.Mid = goproto.Int64(req.BaseRequest.Mid)
|
||||
var accountpunishment *dbstruct.AccountPunishment
|
||||
if ec, accountpunishment = s.ApiCreateMomentBusinessValidate(ctx, req); ec != errcode.ErrCodeMomentSrvOk {
|
||||
|
@ -2010,6 +1999,15 @@ func (s *Service) ApiGetMomentListByIds(ctx *gin.Context, req *momentproto.ApiLi
|
|||
return
|
||||
}
|
||||
|
||||
// 获取访问者的关注列表
|
||||
visitorMid := req.GetBaseRequest().Mid
|
||||
followMap, err := s.utilGetFollowMap(ctx, visitorMid)
|
||||
if err != nil {
|
||||
logger.Error("utilGetFollowMap fail")
|
||||
ec = errcode.ErrCodeAccountRelationSrvFail
|
||||
return
|
||||
}
|
||||
|
||||
// 填充主播信息
|
||||
vos, ec := s.utilFillMomentsStreamerInfo(ctx, list, consts.InterfaceType_Api)
|
||||
if ec != errcode.ErrCodeMomentSrvOk {
|
||||
|
@ -2019,8 +2017,16 @@ func (s *Service) ApiGetMomentListByIds(ctx *gin.Context, req *momentproto.ApiLi
|
|||
}
|
||||
voList = make([]*momentproto.ApiMomentVO, 0)
|
||||
for _, vo := range vos {
|
||||
opVO, _ := vo.(*momentproto.ApiMomentVO)
|
||||
voList = append(voList, opVO)
|
||||
apiVO, _ := vo.(*momentproto.ApiMomentVO)
|
||||
// 填充是否关注
|
||||
s.utilFillIsFollowedFillable(ctx, followMap, apiVO)
|
||||
// 填充是否点赞
|
||||
if err := s.utilFillIsThumbedUpFillable(ctx, visitorMid, apiVO); err != nil {
|
||||
logger.Error("utilFillIsThumbedUpFillable fail")
|
||||
ec = errcode.ErrCodeThumbsUpSrvFail
|
||||
return
|
||||
}
|
||||
voList = append(voList, apiVO)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -2053,6 +2059,44 @@ func (s *Service) ApiThumbsUpMoment(ctx *gin.Context, req *momentproto.ApiThumbs
|
|||
return
|
||||
}
|
||||
|
||||
// 推荐
|
||||
func (s *Service) ApiGetMomentRecommList(ctx *gin.Context, req *momentproto.ApiRecommListReq) (recommMomentList []*momentproto.ApiMomentVO, ec errcode.ErrCode) {
|
||||
|
||||
ec = errcode.ErrCodeMomentSrvOk
|
||||
|
||||
// 1.查找本时段有无新发动态
|
||||
ids, err := s.utilGetMomentRecentListIds(ctx, req.Mid, req.OpType, consts.MomentRecommThroughput)
|
||||
if err != nil {
|
||||
logger.Error("utilGetMomentRecentListIds fail, err: %v", err)
|
||||
ec = errcode.ErrCodeMomentSrvFail
|
||||
return
|
||||
}
|
||||
|
||||
// 2.删去已查询的新动态
|
||||
recommThroughput := int64(consts.MomentRecommThroughput - len(ids))
|
||||
|
||||
// 3.补齐后续动态
|
||||
if recommThroughput > 0 {
|
||||
recommIds, err := s.utilGetMomentRecommListIds(ctx, req.Mid, req.OpType, recommThroughput)
|
||||
if err != nil {
|
||||
logger.Error("utilGetMomentRecommListIds fail, err: %v", err)
|
||||
ec = errcode.ErrCodeMomentSrvFail
|
||||
return
|
||||
}
|
||||
ids = append(ids, recommIds...)
|
||||
}
|
||||
|
||||
// 4.查询动态内容
|
||||
recommMomentList, err = s.utilGetApiMomentVOListByIds(ctx, req.Mid, ids)
|
||||
if err != nil {
|
||||
logger.Error("utilGetMomentRecentListIds fail, err: %v", err)
|
||||
ec = errcode.ErrCodeMomentSrvFail
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Service) ApiGetThumbsUpList(ctx *gin.Context, req *thumbsupproto.ApiListReq) (list []*dbstruct.ThumbsUp, ec errcode.ErrCode) {
|
||||
ec = errcode.ErrCodeThumbsUpSrvOk
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ func (s *CronService) Init(c any) (err error) {
|
|||
//exec.RegTask("clear_content_audit_batch_execution_logs", s.ClearContentAuditBatchExecutionLogs)
|
||||
exec.RegTask("send_contact_customer_services_of_last_minute", s.SendContactCustomerServicesOfLastMinute)
|
||||
exec.RegTask("cancel_account_at_due_time", s.CancelAccountsAtDueTime)
|
||||
exec.RegTask("reload_moment_recomm_list", s.ReloadMomentRecommList)
|
||||
exec.LogHandler(customLogHandle)
|
||||
//注册任务handler
|
||||
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
package logic
|
||||
|
||||
import (
|
||||
"service/app/mix/dao"
|
||||
"service/dbstruct"
|
||||
"service/library/logger"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type UserMomentVisitOffset struct {
|
||||
store *dao.Store
|
||||
}
|
||||
|
||||
func NewUserMomentVisitOffset(store *dao.Store) (a *UserMomentVisitOffset) {
|
||||
a = &UserMomentVisitOffset{
|
||||
store: store,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (p *UserMomentVisitOffset) OpCreate(ctx *gin.Context, uservisitoffset *dbstruct.UserMomentVisitOffset) error {
|
||||
err := p.store.CreateUserMomentVisitOffset(ctx, uservisitoffset)
|
||||
if err != nil {
|
||||
logger.Error("CreateUserMomentVisitOffset fail, err: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *UserMomentVisitOffset) OpUpdate(ctx *gin.Context, uservisitoffset *dbstruct.UserMomentVisitOffset) error {
|
||||
err := p.store.UpdateUserMomentVisitOffset(ctx, uservisitoffset)
|
||||
if err != nil {
|
||||
logger.Error("UpdateUserMomentVisitOffset fail, err: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *UserMomentVisitOffset) OpGetUserMomentVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserMomentVisitOffset, error) {
|
||||
|
||||
uservisitoffset, err := p.store.GetUserMomentVisitOffset(ctx, mid)
|
||||
if err != nil {
|
||||
logger.Error("GetUserMomentVisitOffset fail, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return uservisitoffset, nil
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package logic
|
||||
|
||||
import (
|
||||
"service/app/mix/dao"
|
||||
"service/dbstruct"
|
||||
"service/library/logger"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type UserVisitOffset struct {
|
||||
store *dao.Store
|
||||
}
|
||||
|
||||
func NewUserVisitOffset(store *dao.Store) (a *UserVisitOffset) {
|
||||
a = &UserVisitOffset{
|
||||
store: store,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (p *UserVisitOffset) OpCreate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
|
||||
err := p.store.CreateUserVisitOffset(ctx, uservisitoffset)
|
||||
if err != nil {
|
||||
logger.Error("CreateUserVisitOffset fail, err: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *UserVisitOffset) OpUpdate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
|
||||
err := p.store.UpdateUserVisitOffset(ctx, uservisitoffset)
|
||||
if err != nil {
|
||||
logger.Error("UpdateUserVisitOffset fail, err: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *UserVisitOffset) OpGetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) {
|
||||
|
||||
uservisitoffset, err := p.store.GetUserVisitOffset(ctx, mid)
|
||||
if err != nil {
|
||||
logger.Error("GetUserVisitOffset fail, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return uservisitoffset, nil
|
||||
}
|
|
@ -51,7 +51,6 @@ import (
|
|||
"service/library/mycrypto"
|
||||
"service/library/payclients/alipaycli"
|
||||
"service/library/payclients/wxpaycli"
|
||||
"service/library/redis"
|
||||
|
||||
accountpunishmentproto "service/api/proto/accountpunishment/proto"
|
||||
|
||||
|
@ -105,6 +104,8 @@ var (
|
|||
_DefaultMomentAuditTask *logic.MomentAuditTask
|
||||
_DefaultAccountPunishment *logic.AccountPunishment
|
||||
_DefaultAccountCancellation *logic.AccountCancellation
|
||||
_DefaultUserVisitOffset *logic.UserVisitOffset
|
||||
_DefaultUserMomentVisitOffset *logic.UserMomentVisitOffset
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
|
@ -183,6 +184,8 @@ func (s *Service) Init(c any) (err error) {
|
|||
_DefaultMomentAuditTask = logic.NewMomentAuditTask(store)
|
||||
_DefaultAccountPunishment = logic.NewAccountPunishment(store)
|
||||
_DefaultAccountCancellation = logic.NewAccountCancellation(store)
|
||||
_DefaultUserVisitOffset = logic.NewUserVisitOffset(store)
|
||||
_DefaultUserMomentVisitOffset = logic.NewUserMomentVisitOffset(store)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1985,56 +1988,6 @@ func (s *Service) OpGetStreamerWxId(ctx *gin.Context, req *streamerproto.OpListS
|
|||
return
|
||||
}
|
||||
|
||||
// 推荐
|
||||
func (s *Service) OpGetStreamerRecommList(ctx *gin.Context, req *streamerproto.OpRecommListReq) (recommlist []int64, ec errcode.ErrCode) {
|
||||
|
||||
ec = errcode.ErrCodeStreamerSrvOk
|
||||
|
||||
//1.从redis中获取数据
|
||||
err := redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist)
|
||||
if err != nil {
|
||||
logger.Error("Redis read failed : %v", err)
|
||||
ec = errcode.ErrCodeStreamerRecommListRedisCacheInvalid
|
||||
return
|
||||
}
|
||||
|
||||
//2.若redis命中失败,再从数据库查
|
||||
if len(recommlist) == 0 {
|
||||
logger.Error("Redis hit failed, reading recommendation list from mongo...")
|
||||
|
||||
// recommlist, _, err = _DefaultAccountRelation.OpIsFollowedCount(ctx)
|
||||
// if err != nil {
|
||||
// logger.Error("OpIsFollowedCount fail, err: %v", err)
|
||||
// ec = errcode.ErrCodeAccountRelationSrvFail
|
||||
// return
|
||||
// }
|
||||
|
||||
list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{
|
||||
Sort: "-fans",
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("OpList fail, err: %v", err)
|
||||
ec = errcode.ErrCodeAccountRelationSrvFail
|
||||
return
|
||||
}
|
||||
recommlist = make([]int64, len(list))
|
||||
for i, streamer := range list {
|
||||
recommlist[i] = util.DerefInt64(streamer.Mid)
|
||||
}
|
||||
|
||||
//若数据库命中成功,则立即加载进redis
|
||||
if len(recommlist) != 0 {
|
||||
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0)
|
||||
if err != nil {
|
||||
logger.Error("Redis cache fail, err: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Feedback
|
||||
func (s *Service) OpCreateFeedback(ctx *gin.Context, req *feedbackproto.OpCreateReq) (ec errcode.ErrCode) {
|
||||
ec = errcode.ErrCodeFeedbackSrvOk
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"service/dbstruct"
|
||||
"service/library/apollo"
|
||||
"service/library/logger"
|
||||
"service/library/redis"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -728,3 +729,575 @@ func (s *Service) utilCancelAccountByMids(ctx *gin.Context, midList []int64) err
|
|||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (s *Service) utilGetStreamerRecommList(ctx *gin.Context) (recommlist []int64, err error) {
|
||||
|
||||
// 1.从redis中获取数据
|
||||
err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist)
|
||||
if err != nil {
|
||||
logger.Error("Redis read failed : %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 2.若redis命中失败,再从数据库查
|
||||
if len(recommlist) == 0 {
|
||||
logger.Error("Redis hit failed, reading recommendation list from mongo...")
|
||||
|
||||
list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{
|
||||
Sort: []string{"-fans"},
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("OpList fail, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
recommlist = make([]int64, len(list))
|
||||
for i, streamer := range list {
|
||||
recommlist[i] = util.DerefInt64(streamer.Mid)
|
||||
}
|
||||
|
||||
//若数据库命中成功,则立即加载进redis
|
||||
if len(recommlist) != 0 {
|
||||
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0)
|
||||
if err != nil {
|
||||
logger.Error("Redis cache fail, err: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Service) utilGetInitUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (err error) {
|
||||
uservisitoffset, err := _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
|
||||
if err != nil {
|
||||
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
var execFunc func(*gin.Context, *dbstruct.UserVisitOffset) error
|
||||
|
||||
if uservisitoffset == nil {
|
||||
execFunc = _DefaultUserVisitOffset.OpCreate
|
||||
} else {
|
||||
execFunc = _DefaultUserVisitOffset.OpUpdate
|
||||
}
|
||||
|
||||
// 吞吐量大于等于推荐数组长度,则这次获取后已经触底
|
||||
if consts.StreamerRecommThroughput >= recommlistLength {
|
||||
err = execFunc(ctx, &dbstruct.UserVisitOffset{
|
||||
Id: mid,
|
||||
StreamerRecommOffset: 0,
|
||||
BottomFlag: goproto.Int64(1),
|
||||
Ver: 0,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
err = execFunc(ctx, &dbstruct.UserVisitOffset{
|
||||
Id: mid,
|
||||
StreamerRecommOffset: consts.StreamerRecommThroughput,
|
||||
BottomFlag: goproto.Int64(0),
|
||||
Ver: 0,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) utilGetUpUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserVisitOffset, err error) {
|
||||
uservisitoffset, err = _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
|
||||
if err != nil {
|
||||
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
if uservisitoffset == nil {
|
||||
// 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建
|
||||
return nil, qmgo.ErrNoSuchDocuments
|
||||
} else {
|
||||
nowoffset := (uservisitoffset.StreamerRecommOffset + consts.StreamerRecommThroughput) % recommlistLength
|
||||
// 向上操作固定清掉触底标志
|
||||
var bottomFlagPtr *int64 = nil
|
||||
if uservisitoffset.GetBottomFlag() == 1 {
|
||||
bottomFlagPtr = goproto.Int64(0)
|
||||
}
|
||||
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
|
||||
Id: uservisitoffset.Id,
|
||||
StreamerRecommOffset: nowoffset,
|
||||
BottomFlag: bottomFlagPtr,
|
||||
Ver: uservisitoffset.Ver,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Service) utilGetDownUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserVisitOffset, err error) {
|
||||
uservisitoffset, err = _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
|
||||
if err != nil {
|
||||
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if uservisitoffset == nil {
|
||||
// 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建
|
||||
return nil, qmgo.ErrNoSuchDocuments
|
||||
} else {
|
||||
if uservisitoffset.GetBottomFlag() == 1 {
|
||||
return
|
||||
}
|
||||
offset := uservisitoffset.StreamerRecommOffset
|
||||
if offset+consts.StreamerRecommThroughput >= recommlistLength {
|
||||
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
|
||||
Id: uservisitoffset.Id,
|
||||
StreamerRecommOffset: 0,
|
||||
BottomFlag: goproto.Int64(1),
|
||||
Ver: uservisitoffset.Ver,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
|
||||
Id: uservisitoffset.Id,
|
||||
StreamerRecommOffset: offset + consts.StreamerRecommThroughput,
|
||||
Ver: uservisitoffset.Ver,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// 获取该用户当前在主播推荐数组中的游标和是否已经触底标志
|
||||
func (s *Service) utilGetStreamerRecommlistOffsetAndBottomeFlag(ctx *gin.Context, recommListLength int64, mid int64, opType int64) (offset int64, bottomFlag int64, err error) {
|
||||
offset = int64(0)
|
||||
var uservisitoffset *dbstruct.UserVisitOffset
|
||||
|
||||
// 初始化操作
|
||||
switch opType {
|
||||
case consts.Recomm_Init:
|
||||
err = s.utilGetInitUserVisitOffset(ctx, mid, int64(recommListLength))
|
||||
if err != nil {
|
||||
logger.Error("utilGetInitUserVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
// 向上滚动操作
|
||||
case consts.Recomm_Up:
|
||||
// 若吞吐量比推荐数组长度小,则正常操作,否则认为游标永远是0
|
||||
if consts.StreamerRecommThroughput < recommListLength {
|
||||
uservisitoffset, err = s.utilGetUpUserVisitOffset(ctx, mid, int64(recommListLength))
|
||||
if err != nil {
|
||||
logger.Error("utilGetUpUserVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
offset = uservisitoffset.StreamerRecommOffset
|
||||
}
|
||||
// 向下滚动操作
|
||||
case consts.Recomm_Down:
|
||||
// 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触底
|
||||
if consts.StreamerRecommThroughput < recommListLength {
|
||||
uservisitoffset, err = s.utilGetDownUserVisitOffset(ctx, mid, int64(recommListLength))
|
||||
if err != nil {
|
||||
logger.Error("utilGetUpUserVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
if uservisitoffset.GetBottomFlag() == 1 {
|
||||
return 0, 1, nil
|
||||
}
|
||||
offset = uservisitoffset.StreamerRecommOffset
|
||||
} else {
|
||||
return 0, 1, nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Service) utilGetStreamerRecommListVO(ctx *gin.Context, recommlist []int64, mid int64, opType int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) {
|
||||
// 获取用户游标
|
||||
offset := int64(0)
|
||||
recommListLength := int64(len(recommlist))
|
||||
|
||||
offset, bottomFlag, err := s.utilGetStreamerRecommlistOffsetAndBottomeFlag(ctx, recommListLength, mid, opType)
|
||||
if err != nil {
|
||||
logger.Error("utilGetStreamerRecommlistOffsetAndBottomeFlag fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
if bottomFlag == 1 { // 已触底
|
||||
return make([]*streamerproto.ApiListExtVO, 0), nil
|
||||
}
|
||||
|
||||
// 向下操作,去尾
|
||||
upperBound := int64(consts.StreamerRecommThroughput)
|
||||
if opType == consts.Recomm_Down {
|
||||
surplusVolume := recommListLength - offset
|
||||
if surplusVolume < consts.StreamerRecommThroughput {
|
||||
upperBound = surplusVolume
|
||||
}
|
||||
}
|
||||
|
||||
// 根据用户游标查询得到结果
|
||||
midList := make([]int64, 0)
|
||||
for i := int64(0); i < upperBound; i++ {
|
||||
index := (offset + int64(i)) % int64(recommListLength)
|
||||
midList = append(midList, recommlist[index])
|
||||
}
|
||||
|
||||
accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{
|
||||
Mids: midList,
|
||||
Sort: []string{"_id"},
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultAccount OpListByMids fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 构建一个accountList -> midList 索引的map
|
||||
_accountIndexMap := make(map[int]int)
|
||||
for i, account := range accountList {
|
||||
mid1 := util.DerefInt64(account.Mid)
|
||||
for j, mid2 := range midList {
|
||||
if mid1 == mid2 {
|
||||
_accountIndexMap[i] = j
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
streamerExts := make([]streamerproto.StreamerExtVO, len(accountList))
|
||||
for i := range streamerExts {
|
||||
streamerExts[i] = &streamerproto.ApiListExtVO{}
|
||||
}
|
||||
ec := s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts)
|
||||
if ec != errcode.ErrCodeStreamerSrvOk {
|
||||
err = fmt.Errorf("extend accountlist into streamer_exts fail")
|
||||
logger.Error("Extend accountlist into streamer_exts fail")
|
||||
return
|
||||
}
|
||||
recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts))
|
||||
for i, streamerExt := range streamerExts {
|
||||
vo, _ := streamerExt.(*streamerproto.ApiListExtVO)
|
||||
// 从下标位置映射回去
|
||||
recommStreamerList[_accountIndexMap[i]] = vo
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Service) utilGetInitUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64, throughput int64) (err error) {
|
||||
uservisitoffset, err := _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid)
|
||||
if err != nil {
|
||||
logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
var execFunc func(*gin.Context, *dbstruct.UserMomentVisitOffset) error
|
||||
|
||||
if uservisitoffset == nil {
|
||||
execFunc = _DefaultUserMomentVisitOffset.OpCreate
|
||||
} else {
|
||||
execFunc = _DefaultUserMomentVisitOffset.OpUpdate
|
||||
}
|
||||
|
||||
// 吞吐量大于等于推荐数组长度,则这次获取后已经触底
|
||||
if throughput >= recommlistLength {
|
||||
err = execFunc(ctx, &dbstruct.UserMomentVisitOffset{
|
||||
Id: mid,
|
||||
MomentRecommOffset: 0,
|
||||
BottomFlag: goproto.Int64(1),
|
||||
Ver: 0,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultUserMomentVisitOffset OpCreate fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
err = execFunc(ctx, &dbstruct.UserMomentVisitOffset{
|
||||
Id: mid,
|
||||
MomentRecommOffset: consts.MomentRecommThroughput,
|
||||
BottomFlag: goproto.Int64(0),
|
||||
Ver: 0,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultUserMomentVisitOffset OpCreate fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) utilGetUpUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64, throughput int64) (uservisitoffset *dbstruct.UserMomentVisitOffset, err error) {
|
||||
uservisitoffset, err = _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid)
|
||||
if err != nil {
|
||||
logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
if uservisitoffset == nil {
|
||||
// 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建
|
||||
return nil, qmgo.ErrNoSuchDocuments
|
||||
} else {
|
||||
nowoffset := (uservisitoffset.MomentRecommOffset + throughput) % recommlistLength
|
||||
// 向上操作固定清掉触底标志
|
||||
var bottomFlagPtr *int64 = nil
|
||||
if uservisitoffset.GetBottomFlag() == 1 {
|
||||
bottomFlagPtr = goproto.Int64(0)
|
||||
}
|
||||
err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{
|
||||
Id: uservisitoffset.Id,
|
||||
MomentRecommOffset: nowoffset,
|
||||
BottomFlag: bottomFlagPtr,
|
||||
Ver: uservisitoffset.Ver,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultUserMomentVisitOffset OpUpdate fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Service) utilGetDownUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64, throughput int64) (uservisitoffset *dbstruct.UserMomentVisitOffset, err error) {
|
||||
uservisitoffset, err = _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid)
|
||||
if err != nil {
|
||||
logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if uservisitoffset == nil {
|
||||
// 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建
|
||||
return nil, qmgo.ErrNoSuchDocuments
|
||||
} else {
|
||||
if uservisitoffset.GetBottomFlag() == 1 {
|
||||
return
|
||||
}
|
||||
offset := uservisitoffset.MomentRecommOffset
|
||||
if offset+throughput >= recommlistLength {
|
||||
err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{
|
||||
Id: uservisitoffset.Id,
|
||||
MomentRecommOffset: 0,
|
||||
BottomFlag: goproto.Int64(1),
|
||||
Ver: uservisitoffset.Ver,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultUserMomentVisitOffset OpUpdate fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{
|
||||
Id: uservisitoffset.Id,
|
||||
MomentRecommOffset: offset + throughput,
|
||||
Ver: uservisitoffset.Ver,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultUserMomentVisitOffset OpUpdate fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// 获取该用户当前在主播推荐数组中的游标和是否已经触底标志
|
||||
func (s *Service) utilGetMomentRecommlistOffsetAndBottomeFlag(ctx *gin.Context, mid int64, opType int64, recommListLength int64, throughput int64) (offset int64, isAtBottom int64, err error) {
|
||||
offset = int64(0)
|
||||
isRecommListSuffi := throughput <= recommListLength
|
||||
var uservisitoffset *dbstruct.UserMomentVisitOffset
|
||||
|
||||
// 初始化操作
|
||||
switch opType {
|
||||
case consts.Recomm_Init:
|
||||
err = s.utilGetInitUserMomentVisitOffset(ctx, mid, recommListLength, throughput)
|
||||
if err != nil {
|
||||
logger.Error("utilGetInitUserMomentVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
// 向上滚动操作
|
||||
case consts.Recomm_Up:
|
||||
// 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触顶(游标永远是0)
|
||||
if !isRecommListSuffi {
|
||||
return
|
||||
}
|
||||
uservisitoffset, err = s.utilGetUpUserMomentVisitOffset(ctx, mid, recommListLength, throughput)
|
||||
if err != nil {
|
||||
logger.Error("utilGetUpUserMomentVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
offset = uservisitoffset.MomentRecommOffset
|
||||
// 向下滚动操作
|
||||
case consts.Recomm_Down:
|
||||
// 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触底
|
||||
if !isRecommListSuffi {
|
||||
return 0, 1, nil
|
||||
}
|
||||
uservisitoffset, err = s.utilGetDownUserMomentVisitOffset(ctx, mid, recommListLength, throughput)
|
||||
if err != nil {
|
||||
logger.Error("utilGetUpUserMomentVisitOffset fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
if uservisitoffset.GetBottomFlag() == 1 {
|
||||
return 0, 1, nil
|
||||
}
|
||||
offset = uservisitoffset.MomentRecommOffset
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// 根据提供的用户mid和操作类型,从最近推荐列表中获取吞吐量大小的列表,并更新用户游标
|
||||
func (s *Service) utilGetMomentRecentListIds(ctx *gin.Context, mid int64, opType int64, throughput int64) (ids []int64, err error) {
|
||||
|
||||
if opType != consts.Recomm_Up {
|
||||
return
|
||||
}
|
||||
|
||||
// 从redis中获取动态列表长度
|
||||
recentListLength, err := redis.GetRedisClient().LLen(consts.RedisMomentPrefix + "recent_list")
|
||||
if err != nil {
|
||||
logger.Error("Redis read failed : %v", err)
|
||||
return
|
||||
}
|
||||
if recentListLength == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取用户游标
|
||||
offset, err := redis.GetRedisClient().HGetInt64(consts.RedisMomentPrefix+"recent_list_offset", fmt.Sprint(mid))
|
||||
if err != nil && !redis.GetRedisClient().IsErrNil(err) {
|
||||
logger.Error("Redis read failed : %v", err)
|
||||
return
|
||||
}
|
||||
// 检验游标是否还可用
|
||||
ids = make([]int64, 0)
|
||||
incr := 0
|
||||
for ; incr < int(throughput) && offset < recentListLength; incr++ {
|
||||
id, err := redis.GetRedisClient().LIndexInt64(consts.RedisMomentPrefix+"recent_list", int(offset))
|
||||
if err != nil {
|
||||
logger.Error("Redis read failed : %v", err)
|
||||
return make([]int64, 0), err
|
||||
}
|
||||
ids = append(ids, id)
|
||||
offset++
|
||||
}
|
||||
_, err = redis.GetRedisClient().HIncrby(consts.RedisMomentPrefix+"recent_list_offset", fmt.Sprint(mid), incr)
|
||||
if err != nil && !redis.GetRedisClient().IsErrNil(err) {
|
||||
logger.Error("Redis cache failed : %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// 根据提供的用户mid和操作类型,从推荐列表中获取吞吐量大小的列表,并更新用户游标
|
||||
func (s *Service) utilGetMomentRecommListIds(ctx *gin.Context, mid int64, opType int64, throughput int64) (ids []int64, err error) {
|
||||
// 从redis中获取动态列表长度
|
||||
recommListLength, err := redis.GetRedisClient().LLen(consts.RedisMomentPrefix + "recomm_list")
|
||||
if err != nil {
|
||||
logger.Error("Redis read failed : %v", err)
|
||||
return
|
||||
}
|
||||
if recommListLength == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取用户游标
|
||||
offset := int64(0)
|
||||
|
||||
// 获取游标和是否已触底标志
|
||||
offset, bottomFlag, err := s.utilGetMomentRecommlistOffsetAndBottomeFlag(ctx, mid, opType, recommListLength, throughput)
|
||||
if err != nil {
|
||||
logger.Error("utilGetMomentRecommlistOffsetAndBottomeFlag fail, err: %v", err)
|
||||
return
|
||||
}
|
||||
if bottomFlag == 1 { // 已触底
|
||||
return make([]int64, 0), nil
|
||||
}
|
||||
|
||||
// 向下操作,去尾
|
||||
upperBound := throughput
|
||||
if opType == consts.Recomm_Down {
|
||||
surplusVolume := recommListLength - offset
|
||||
if surplusVolume < throughput {
|
||||
upperBound = surplusVolume
|
||||
}
|
||||
}
|
||||
|
||||
// 根据用户游标得到待查询ids
|
||||
ids = make([]int64, 0)
|
||||
for i := int64(0); i < upperBound; i++ {
|
||||
index := (offset + i) % recommListLength
|
||||
id, err := redis.GetRedisClient().LIndexInt64(consts.RedisMomentPrefix+"recomm_list", int(index))
|
||||
if err != nil {
|
||||
logger.Error("Redis read failed : %v", err)
|
||||
return make([]int64, 0), err
|
||||
}
|
||||
ids = append(ids, id)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// 根据提供的ids,查询出动态,并填充主播、是否关注、是否点赞,并保证按照提供的顺序返回ApiMomentVO的list
|
||||
func (s *Service) utilGetApiMomentVOListByIds(ctx *gin.Context, visitorMid int64, ids []int64) (volist []*momentproto.ApiMomentVO, err error) {
|
||||
|
||||
if len(ids) == 0 {
|
||||
return make([]*momentproto.ApiMomentVO, 0), nil
|
||||
}
|
||||
|
||||
list, err := _DefaultMoment.OpListByIds(ctx, &momentproto.OpListByIdsReq{
|
||||
Ids: ids,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("_DefaultMoment OpListByIds fail, , err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// list -> idList 索引的map,保证不打乱顺序
|
||||
_momentIndexMap := make(map[int]int)
|
||||
for i, account := range list {
|
||||
id1 := util.DerefInt64(account.Id)
|
||||
for j, id2 := range ids {
|
||||
if id1 == id2 {
|
||||
_momentIndexMap[i] = j
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 获取访问者的关注列表
|
||||
followMap, err := s.utilGetFollowMap(ctx, visitorMid)
|
||||
if err != nil {
|
||||
logger.Error("utilGetFollowMap fail")
|
||||
return
|
||||
}
|
||||
|
||||
// 填充主播信息
|
||||
vos, ec := s.utilFillMomentsStreamerInfo(ctx, list, consts.InterfaceType_Api)
|
||||
if ec != errcode.ErrCodeMomentSrvOk {
|
||||
logger.Error("utilFillMomentsStreamerInfo fail, , err: %v", err)
|
||||
return
|
||||
}
|
||||
volist = make([]*momentproto.ApiMomentVO, len(vos))
|
||||
for i, vo := range vos {
|
||||
apiVO, _ := vo.(*momentproto.ApiMomentVO)
|
||||
// 填充是否关注
|
||||
s.utilFillIsFollowedFillable(ctx, followMap, apiVO)
|
||||
// 填充是否点赞
|
||||
if err = s.utilFillIsThumbedUpFillable(ctx, visitorMid, apiVO); err != nil {
|
||||
logger.Error("utilFillIsThumbedUpFillable fail")
|
||||
return
|
||||
}
|
||||
volist[_momentIndexMap[i]] = apiVO
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -4,11 +4,11 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"service/api/consts"
|
||||
"service/api/errcode"
|
||||
accountproto "service/api/proto/account/proto"
|
||||
account_cancellationproto "service/api/proto/account_cancellation/proto"
|
||||
contact_customer_service_proto "service/api/proto/contact_customer_service/proto"
|
||||
daily_statementproto "service/api/proto/daily_statement/proto"
|
||||
momentproto "service/api/proto/moment/proto"
|
||||
streamerproto "service/api/proto/streamer/proto"
|
||||
vasproto "service/api/proto/vas/proto"
|
||||
vericodeproto "service/api/proto/vericode/proto"
|
||||
|
@ -31,18 +31,19 @@ import (
|
|||
func (s *CronService) ReloadRecommList(ctx context.Context, param *xxl.RunReq) (msg string) {
|
||||
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
|
||||
logger.Info("Refreshing recommendation list cached in redis...")
|
||||
list, ec := DefaultService.OpGetStreamerList(&gin.Context{}, &streamerproto.OpListReq{
|
||||
Sort: "-fans",
|
||||
list, err := _DefaultStreamer.OpList(&gin.Context{}, &streamerproto.OpListReq{
|
||||
Sort: []string{"fans"},
|
||||
})
|
||||
if ec != errcode.ErrCodeAccountRelationSrvOk {
|
||||
logger.Error("OpGetStreamerList fail, ec: %v", ec)
|
||||
return fmt.Sprintf("OpGetStreamerList fail, ec: %v", ec)
|
||||
if err != nil {
|
||||
logger.Error("OpList fail, err: %v", err)
|
||||
return fmt.Sprintf("OpList fail, err: %v", err)
|
||||
}
|
||||
midList := make([]int64, len(list))
|
||||
for i, streamer := range list {
|
||||
midList[i] = util.DerefInt64(streamer.Mid)
|
||||
}
|
||||
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0)
|
||||
|
||||
err = redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0)
|
||||
if err != nil {
|
||||
logger.Error("Redis cache fail, err: %v", err)
|
||||
return fmt.Sprintf("Redis cache fail, err: %v", err)
|
||||
|
@ -387,3 +388,38 @@ func (s *CronService) CancelAccountsAtDueTime(ctx context.Context, param *xxl.Ru
|
|||
|
||||
return "Account cancellation execution finished..."
|
||||
}
|
||||
|
||||
func (s *CronService) ReloadMomentRecommList(ctx context.Context, param *xxl.RunReq) (msg string) {
|
||||
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
|
||||
logger.Info("Refreshing recommendation list cached in redis...")
|
||||
list, err := _DefaultMoment.OpList(&gin.Context{}, &momentproto.OpListReq{
|
||||
CtLowerBound: goproto.Int64(0),
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("OpGetMomentList fail, err: %v", err)
|
||||
return fmt.Sprintf("OpGetMomentList fail, err: %v", err)
|
||||
}
|
||||
|
||||
// 清缓存
|
||||
if err := redis.GetRedisClient().Del(consts.RedisMomentPrefix + "recomm_list"); err != nil {
|
||||
logger.Error("Del redis cache fail, err: %v", err)
|
||||
return fmt.Sprintf("Del redis cache fail, err: %v", err)
|
||||
}
|
||||
if err := redis.GetRedisClient().Del(consts.RedisMomentPrefix + "recent_list_offset"); err != nil {
|
||||
logger.Error("Del redis cache fail, err: %v", err)
|
||||
return fmt.Sprintf("Del redis cache fail, err: %v", err)
|
||||
}
|
||||
|
||||
// 加载缓存
|
||||
for _, moment := range list {
|
||||
id := util.DerefInt64(moment.Id)
|
||||
err := redis.GetRedisClient().RPush(consts.RedisMomentPrefix+"recomm_list", id)
|
||||
if err != nil {
|
||||
logger.Error("Redis cache fail, err: %v", err)
|
||||
return fmt.Sprintf("Redis cache fail, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info("Refresh moment recommendation list cached in redis accomplished...")
|
||||
return "Refresh moment recommendation list cached in redis accomplished"
|
||||
}
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
package dbstruct
|
||||
|
||||
type UserMomentVisitOffset struct {
|
||||
Id int64 `json:"id" bson:"_id"` // id,用户的mid
|
||||
MomentRecommOffset int64 `json:"moment_recomm_offset" bson:"moment_recomm_offset"` // 动态推荐列表偏移量
|
||||
BottomFlag *int64 `json:"bottom_flag" bson:"bottom_flag"` // 是否触底标志
|
||||
Ver int64 `json:"ver" bson:"ver"` // 乐观锁
|
||||
}
|
||||
|
||||
func (p *UserMomentVisitOffset) GetBottomFlag() int64 {
|
||||
if p == nil || p.BottomFlag == nil {
|
||||
return 0
|
||||
}
|
||||
return *p.BottomFlag
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package dbstruct
|
||||
|
||||
type UserVisitOffset struct {
|
||||
Id int64 `json:"id" bson:"_id"` // id,用户的mid
|
||||
StreamerRecommOffset int64 `json:"streamer_recomm_offset" bson:"streamer_recomm_offset"` // 主播推荐列表偏移量
|
||||
BottomFlag *int64 `json:"bottom_flag" bson:"bottom_flag"` // 是否触底标志
|
||||
Ver int64 `json:"ver" bson:"ver"` // 乐观锁
|
||||
}
|
||||
|
||||
func (p *UserVisitOffset) GetBottomFlag() int64 {
|
||||
if p == nil || p.BottomFlag == nil {
|
||||
return 0
|
||||
}
|
||||
return *p.BottomFlag
|
||||
}
|
|
@ -300,6 +300,12 @@ func (c *Client) HGetAll(key string, val interface{}) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// HINCRBY 将保存的整型键值增加
|
||||
func (c *Client) HIncrby(key string, field string, val int) (reply interface{}, err error) {
|
||||
reply, err = c.Do("HINCRBY", c.getKey(key), field, val)
|
||||
return
|
||||
}
|
||||
|
||||
/**
|
||||
Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)
|
||||
**/
|
||||
|
@ -476,7 +482,7 @@ func (c *Client) LREM(key string, count int, member interface{}) (int, error) {
|
|||
|
||||
// LLen 获取列表的长度
|
||||
func (c *Client) LLen(key string) (int64, error) {
|
||||
return Int64(c.Do("RPOP", c.getKey(key)))
|
||||
return Int64(c.Do("LLEN", c.getKey(key)))
|
||||
}
|
||||
|
||||
// LRange 返回列表 key 中指定区间内的元素,区间以偏移量 start 和 stop 指定。
|
||||
|
@ -487,6 +493,11 @@ func (c *Client) LRange(key string, start, end int) (interface{}, error) {
|
|||
return c.Do("LRANGE", c.getKey(key), start, end)
|
||||
}
|
||||
|
||||
// LIndex 返回列表 key 中指定索引的元素。
|
||||
func (c *Client) LIndexInt64(key string, index int) (int64, error) {
|
||||
return Int64(c.Do("LINDEX", c.getKey(key), index))
|
||||
}
|
||||
|
||||
/**
|
||||
Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员。
|
||||
不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。
|
||||
|
@ -813,3 +824,8 @@ func (c *Client) closePool() {
|
|||
// func init() {
|
||||
// cache.Register("redis", &Client{})
|
||||
// }
|
||||
|
||||
// 是否是空错误
|
||||
func (c *Client) IsErrNil(err error) bool {
|
||||
return err == redis.ErrNil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue