diff --git a/api/proto/streamer/proto/streamer_api.go b/api/proto/streamer/proto/streamer_api.go index d36ae003..b496d186 100644 --- a/api/proto/streamer/proto/streamer_api.go +++ b/api/proto/streamer/proto/streamer_api.go @@ -2,6 +2,7 @@ package proto import ( "service/api/base" + "service/bizcommon/util" "service/dbstruct" ) @@ -193,3 +194,25 @@ type ApiRecommListResp struct { base.BaseResponse Data *ApiRecommListData `json:"data"` } + +// api 推荐 +type ApiFilterReq struct { + base.BaseRequest + Age *util.Int64Filter `json:"age"` + Fans *util.Int64Filter `json:"fans"` + Height *util.Int64Filter `json:"height"` + Weight *util.Int64Filter `json:"weight"` + City *string `json:"city"` + Constellation *string `json:"constellation"` + IsActiveWithinAWeek *int64 `json:"is_active_within_a_week"` +} + +type ApiFilterData struct { + StreamerList []*ApiListExtVO `json:"streamer_list"` + RecommList []*ApiListExtVO `json:"recomm_list"` +} + +type ApiFilterResp struct { + base.BaseResponse + Data *ApiFilterData `json:"data"` +} diff --git a/api/proto/streamer_acct/proto/streamer_acct_op.go b/api/proto/streamer_acct/proto/streamer_acct_op.go index 85d62830..89b034d3 100644 --- a/api/proto/streamer_acct/proto/streamer_acct_op.go +++ b/api/proto/streamer_acct/proto/streamer_acct_op.go @@ -1,6 +1,9 @@ package proto -import "service/api/base" +import ( + "service/api/base" + "service/bizcommon/util" +) // 根据UserId模糊查询(和姓名取并集) type OpListFuzzilyByUserIdReq struct { @@ -37,3 +40,27 @@ type OpListFuzzilyByNameResp struct { base.BaseResponse Data *OpListFuzzilyByNameData `json:"data"` } + +// 筛选 +type OpFilterReq struct { + base.BaseRequest + Age *util.Int64Filter `json:"age"` + Fans *util.Int64Filter `json:"fans"` + Height *util.Int64Filter `json:"height"` + Weight *util.Int64Filter `json:"weight"` + City *string `json:"city"` + Constellation *string `json:"constellation"` + IsActiveWithinAWeek *int64 `json:"is_active_within_a_week"` + Offset int `json:"offset"` + Limit int `json:"limit"` + Sort []string + BlockedFromBeingSearchedList []string +} + +type OpFilterData struct { +} + +type OpFilterResp struct { + base.BaseResponse + Data *OpListFuzzilyByNameData `json:"data"` +} diff --git a/app/mix/controller/init.go b/app/mix/controller/init.go index 540b07bd..e65448e0 100644 --- a/app/mix/controller/init.go +++ b/app/mix/controller/init.go @@ -168,6 +168,7 @@ func Init(r *gin.Engine) { apiStreamerGroup.POST("list_ext_fuzzily_by_name", middleware.JSONParamValidator(streamerproto.ApiListExtFuzzilyByNameReq{}), middleware.JwtAuthenticator(), ApiGetStreamerExtListFuzzilyByName) apiStreamerGroup.POST("list_wx_id", middleware.JSONParamValidator(streamerproto.ApiListStreamerWxIdReq{}), middleware.JwtAuthenticator(), ApiGetStreamerWxId) apiStreamerGroup.POST("recomm_list", middleware.JSONParamValidator(streamerproto.ApiRecommListReq{}), middleware.JwtAuthenticator(), ApiGetStreamerRecommList) + apiStreamerGroup.POST("filter", middleware.JSONParamValidator(streamerproto.ApiFilterReq{}), middleware.JwtAuthenticator(), ApiFilterStreamer) // 意见反馈 apiFeedbackGroup := r.Group("/api/feedback", PrepareToC()) diff --git a/app/mix/controller/streamer_api.go b/app/mix/controller/streamer_api.go index 25451200..8a439e17 100644 --- a/app/mix/controller/streamer_api.go +++ b/app/mix/controller/streamer_api.go @@ -238,3 +238,39 @@ func ApiGetStreamerRecommList(ctx *gin.Context) { } ReplyOk(ctx, data) } + +func ApiFilterStreamer(ctx *gin.Context) { + req := ctx.MustGet("client_req").(*streamerproto.ApiFilterReq) + + streamerlist, recommlist, ec := service.DefaultService.ApiFilterStreamer(ctx, req) + if ec != errcode.ErrCodeStreamerSrvOk { + logger.Error("ApiFilterStreamer fail, req: %v, ec: %v", util.ToJson(req), ec) + ReplyErrCodeMsg(ctx, ec) + return + } + + objectMediaNum := 4 // 单个主播服务总共4个媒体类 + mediaFillableList := make([]mediafiller.MediaFillable, len(streamerlist)*objectMediaNum) + for i, vo := range streamerlist { + mediaFillableList[objectMediaNum*i+0] = vo.Avatar + mediaFillableList[objectMediaNum*i+1] = vo.Cover + mediaFillableList[objectMediaNum*i+2] = vo.Shorts + mediaFillableList[objectMediaNum*i+3] = vo.Album + } + mediafiller.FillList(ctx, mediaFillableList) + + mediaFillableList = make([]mediafiller.MediaFillable, len(recommlist)*objectMediaNum) + for i, vo := range recommlist { + mediaFillableList[objectMediaNum*i+0] = vo.Avatar + mediaFillableList[objectMediaNum*i+1] = vo.Cover + mediaFillableList[objectMediaNum*i+2] = vo.Shorts + mediaFillableList[objectMediaNum*i+3] = vo.Album + } + mediafiller.FillList(ctx, mediaFillableList) + + data := &streamerproto.ApiFilterData{ + StreamerList: streamerlist, + RecommList: recommlist, + } + ReplyOk(ctx, data) +} diff --git a/app/mix/dao/elasticsearch.go b/app/mix/dao/elasticsearch.go index 9db99118..0573e060 100644 --- a/app/mix/dao/elasticsearch.go +++ b/app/mix/dao/elasticsearch.go @@ -8,6 +8,7 @@ import ( "service/dbstruct" "service/library/elasticsearchdb" "service/library/logger" + "time" "unicode" "service/api/consts" @@ -40,7 +41,7 @@ func NewElasticSearch(cfg *conf.ConfigSt) (es *ElasticSearch, err error) { } const ( - IndexStreamerAcct = "streamer_acct" + IndexStreamerAcct = "new_streamer_acct" TypeStreamerAcct = "_doc" ) @@ -66,6 +67,21 @@ func (es *ElasticSearch) UpdateStreamerAcct(ctx *gin.Context, streameracct *dbst return err } +func (es *ElasticSearch) UpdateStreamerAcctSelectively(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater) error { + _, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.GetMid())).Doc(util.EntityToM(streameracct)).Do(ctx) + return err +} + +func (es *ElasticSearch) UpdateStreamerAcctSelectivelyByMids(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater, mids []int64) error { + mp := util.EntityToM(streameracct) + bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true") + for _, mid := range mids { + bulk.Add(elastic.NewBulkUpdateRequest().Id(fmt.Sprint(mid)).Doc(mp)) + } + _, err := bulk.Do(ctx) + return err +} + func (es *ElasticSearch) GetStreamerAcctListFuzzilyByName(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByNameReq) (list []*dbstruct.EsStreamerAcct, err error) { list = make([]*dbstruct.EsStreamerAcct, 0) @@ -157,3 +173,69 @@ func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, re return } + +func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streameracctproto.OpFilterReq) (list []*dbstruct.EsStreamerAcct, err error) { + + list = make([]*dbstruct.EsStreamerAcct, 0) + + delFlagClause := elastic.NewMatchQuery("del_flag", 0) + mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...) + + queryClauses := make([]elastic.Query, 0) + queryClauses = append(queryClauses, delFlagClause) + // 年龄 + ageClause := util.CreateEsInt64RangeQuery("age", req.Age) + if ageClause != nil { + queryClauses = append(queryClauses, ageClause) + } + // 粉丝量 + fansClause := util.CreateEsInt64RangeQuery("fans", req.Fans) + if fansClause != nil { + queryClauses = append(queryClauses, fansClause) + } + // 身高 + heightClause := util.CreateEsInt64RangeQuery("age", req.Height) + if heightClause != nil { + queryClauses = append(queryClauses, heightClause) + } + // 体重 + weightClause := util.CreateEsInt64RangeQuery("age", req.Weight) + if weightClause != nil { + queryClauses = append(queryClauses, weightClause) + } + // 城市 + if req.City != nil { + queryClauses = append(queryClauses, elastic.NewMatchQuery("city", util.DerefString(req.City))) + } + // 星座 + if req.Constellation != nil { + queryClauses = append(queryClauses, elastic.NewMatchQuery("constellation", util.DerefString(req.Constellation))) + } + // 是否7日内有更新 + if req.IsActiveWithinAWeek != nil { + isActiveWithinAWeek := util.DerefInt64(req.IsActiveWithinAWeek) + if isActiveWithinAWeek == 1 { + daystartBeforeAWeek := util.GetDayStartTimeStamp(time.Now()) - 7*86400 // 这是7日前的日始整点时间戳 + queryClauses = append(queryClauses, elastic.NewRangeQuery("last_zone_moment_create_day_start").Gte(daystartBeforeAWeek)) + } + } + + query := elastic.NewBoolQuery().Must(queryClauses...).MustNot(mustnotClause) + res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) + if err != nil { + logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) + return + } + + for _, hit := range res.Hits.Hits { + streameracct := &dbstruct.EsStreamerAcct{} + err = json.Unmarshal(hit.Source, streameracct) + if err != nil { + logger.Error("json Unmarshal fail, err: %v", err) + return + } + list = append(list, streameracct) + } + + return +} diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index 54d27643..67d4be76 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -5058,6 +5058,23 @@ func (m *Mongo) GetZidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []int64) return zids, err } +func (m *Mongo) GetMidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []int64) ([]int64, error) { + col := m.getColZoneMoment() + query := qmgo.M{ + "_id": qmgo.M{ + "$in": zonemomentIds, + }, + "del_flag": 0, + } + mids := make([]int64, 0) + err := col.Find(ctx, query).Distinct("mid", &mids) + if err == qmgo.ErrNoSuchDocuments { + err = nil + return make([]int64, 0), err + } + return mids, err +} + func (m *Mongo) HeadZoneMomentByIds(ctx *gin.Context, ids []int64, opType int64) error { col := m.getColZoneMoment() query := qmgo.M{ diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 3768c48b..f6535e34 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -1447,6 +1447,19 @@ func (s *Service) ApiUpdateStreamer(ctx *gin.Context, req *streamerproto.ApiUpda logger.Error("UpdateWechat fail, mid: %v, err: %v", err) return } + // 更新es + if err := _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: req.Streamer.Mid, + Age: req.Streamer.Age, + Height: req.Streamer.Height, + Weight: req.Streamer.Weight, + City: req.Streamer.City, + Constellation: req.Streamer.Constellation, + }); err != nil { + logger.Error("OpUpdateSelectively fail, req: %v, err: %v", util.ToJson(req), err) + ec = errcode.ErrCodeStreamerSrvFail + return + } return } @@ -1765,6 +1778,58 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto. return } +func (s *Service) ApiFilterStreamer(ctx *gin.Context, req *streamerproto.ApiFilterReq) (streamerlist []*streamerproto.ApiListExtVO, recommlist []*streamerproto.ApiListExtVO, ec errcode.ErrCode) { + + ec = errcode.ErrCodeStreamerSrvOk + + // 从redis中获取当前禁止被搜索的主播 + blockedlist := make([]string, 0) + err := redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"blocked_from_being_searched_list", &blockedlist) + if err != nil && !redis.GetRedisClient().IsErrNil(err) { + logger.Error("Redis read failed : %v", err) + return + } + + //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 + streameraccts, err := _DefaultStreamerAcct.OpFilterStreamerAcct(ctx, &streameracctproto.OpFilterReq{ + Age: req.Age, + Fans: req.Fans, + Height: req.Height, + Weight: req.Weight, + City: req.City, + Constellation: req.Constellation, + IsActiveWithinAWeek: req.IsActiveWithinAWeek, + Offset: 0, + Limit: 20, + Sort: []string{"_id"}, + BlockedFromBeingSearchedList: blockedlist, + }) + if err != nil { + logger.Error("Account OpFilterStreamerAcct fail, req: %v, err: %v", util.ToJson(req), err) + ec = errcode.ErrCodeAccountSrvFail + return + } + + //2.获取mids + mids := make([]int64, 0) + for _, v := range streameraccts { + mids = append(mids, v.Mid) + } + + mp, err := s.utilGetStreamerExtMapByMids(ctx, mids, consts.InterfaceType_Api) + if err != nil { + logger.Error("utilGetStreamerExtMapByMids fail, err: %v", err) + return + } + streamerlist = make([]*streamerproto.ApiListExtVO, 0) + for _, mid := range mids { + vo, _ := mp[mid].(*streamerproto.ApiListExtVO) + streamerlist = append(streamerlist, vo) + } + + return +} + // Feedback func (s *Service) ApiCreateFeedback(ctx *gin.Context, req *feedbackproto.ApiCreateReq) (ec errcode.ErrCode) { ec = errcode.ErrCodeFeedbackSrvOk @@ -3156,11 +3221,23 @@ func (s *Service) ApiDeleteZoneMoment(ctx *gin.Context, req *zonemomentproto.Api LastZoneMomentCt: goproto.Int64(0), }, }, zone.GetLastZoneMomentCt()) + if err == qmgo.ErrNoSuchDocuments { + return + } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } + err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: zonemoment.Mid, + LastZoneMomentCreateDayStart: goproto.Int64(0), + }) + if err != nil { + logger.Error("OpUpdateSelectively fail, err: %v", err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } } else { //还有动态,看最后一条更新的动态的更新时间是否比删除的那条动态更早,若更早,则需要把空间的更新时间回拨 lastzonemoment := list[0] @@ -3171,11 +3248,23 @@ func (s *Service) ApiDeleteZoneMoment(ctx *gin.Context, req *zonemomentproto.Api LastZoneMomentCt: lastzonemoment.Ut, }, }, zone.GetLastZoneMomentCt()) + if err == qmgo.ErrNoSuchDocuments { + return + } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } + err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: zonemoment.Mid, + LastZoneMomentCreateDayStart: goproto.Int64(util.GetDayStartTimeStamp(time.Unix(lastzonemoment.GetUt(), 0))), + }) + if err != nil { + logger.Error("OpUpdateSelectively fail, err: %v", err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } } } } diff --git a/app/mix/service/logic/streamer_acct.go b/app/mix/service/logic/streamer_acct.go index 68aa0699..70959ea5 100644 --- a/app/mix/service/logic/streamer_acct.go +++ b/app/mix/service/logic/streamer_acct.go @@ -39,6 +39,24 @@ func (p *StreamerAcct) OpUpdate(ctx *gin.Context, streameracct *dbstruct.EsStrea return nil } +func (p *StreamerAcct) OpUpdateSelectively(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater) error { + err := p.store.UpdateStreamerAcctSelectively(ctx, streameracct) + if err != nil { + logger.Error("UpdateStreamerAcctSelectively fail, err: %v", err) + return err + } + return nil +} + +func (p *StreamerAcct) OpUpdateSelectivelyByMids(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater, mids []int64) error { + err := p.store.UpdateStreamerAcctSelectivelyByMids(ctx, streameracct, mids) + if err != nil { + logger.Error("UpdateStreamerAcctSelectivelyByMids fail, err: %v", err) + return err + } + return nil +} + func (p *StreamerAcct) OpListStreamerAcctFuzzilyByUserId(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByUserIdReq) ([]*dbstruct.EsStreamerAcct, error) { list, err := p.store.GetStreamerAcctListFuzzilyByUserId(ctx, req) if err != nil { @@ -56,3 +74,12 @@ func (p *StreamerAcct) OpListStreamerAcctFuzzilyByName(ctx *gin.Context, req *st } return list, nil } + +func (p *StreamerAcct) OpFilterStreamerAcct(ctx *gin.Context, req *streameracctproto.OpFilterReq) ([]*dbstruct.EsStreamerAcct, error) { + list, err := p.store.FilterStreamerAcctList(ctx, req) + if err != nil { + logger.Error("FilterStreamerAcctList fail, err: %v", err) + return nil, err + } + return list, nil +} diff --git a/app/mix/service/logic/zonemoment.go b/app/mix/service/logic/zonemoment.go index 9810ef7c..499d82d2 100644 --- a/app/mix/service/logic/zonemoment.go +++ b/app/mix/service/logic/zonemoment.go @@ -170,6 +170,15 @@ func (p *ZoneMoment) GetZidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []in return zids, err } +func (p *ZoneMoment) GetMidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []int64) ([]int64, error) { + mids, err := p.store.GetMidsByZoneMomentIds(ctx, zonemomentIds) + if err != nil { + logger.Error("GetMidsByZoneMomentIds fail, err: %v", err) + return nil, err + } + return mids, err +} + func (p *ZoneMoment) OpHeadByIds(ctx *gin.Context, ids []int64, opType int64) error { err := p.store.HeadZoneMomentByIds(ctx, ids, opType) if err != nil { diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 591f3ad6..1888d466 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -1878,24 +1878,29 @@ func (s *Service) OpApproveStreamerAuthApproval(ctx *gin.Context, req *streamera } // 主播账户表新增数据 - accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{ - Mids: midList, - }) + accountMp, err := _DefaultAccount.GetAccountMapByMids(ctx, midList) if err != nil { logger.Error("_DefaultAccount OpListByMids fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeAccountSrvFail return } streamerAccts := make([]*dbstruct.EsStreamerAcct, 0) - for _, acct := range accountList { + for _, streamer := range streamerList { + acct := accountMp[streamer.GetMid()] streamerAccts = append(streamerAccts, &dbstruct.EsStreamerAcct{ - Mid: acct.GetMid(), - Name: acct.GetName(), - UserIdString: acct.GetUserIdString(), - PinYin: strings.Join(pinyin.LazyConvert(acct.GetName(), nil), ""), - Ct: acct.GetCt(), - Ut: acct.GetUt(), - DelFlag: acct.GetDelFlag(), + Mid: acct.GetMid(), + Name: acct.GetName(), + UserIdString: acct.GetUserIdString(), + PinYin: strings.Join(pinyin.LazyConvert(acct.GetName(), nil), ""), + Age: streamer.GetAge(), + Height: streamer.GetHeight(), + Weight: streamer.GetWeight(), + City: streamer.GetCity(), + Constellation: streamer.GetConstellation(), + LastZoneMomentCreateDayStart: 0, + Ct: acct.GetCt(), + Ut: acct.GetUt(), + DelFlag: acct.GetDelFlag(), }) } err = _DefaultStreamerAcct.OpCreate(ctx, streamerAccts) @@ -2170,31 +2175,19 @@ func (s *Service) OpApproveStreamerAuthApprovalDetails(ctx *gin.Context, req *st return } } - accounts, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{ - Mids: mids, - }) + accountMp, err := _DefaultAccount.GetAccountMapByMids(ctx, mids) if err != nil { logger.Error("_DefaultAccount OpListByMids fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeAccountSrvFail return } - // 预填写主播账户表数据 - streamerAccts := make([]*dbstruct.EsStreamerAcct, 0) - for _, acct := range accounts { + // 判断主播是否是准主播 + for _, acct := range accountMp { if acct.GetRole() != consts.StreamerToBe { logger.Error("mid: %v is not a streamer-to-be, req: %v", acct.GetMid(), util.ToJson(req)) ec = errcode.ErrCodeStreamerAuthApprovalDetailsNotAStreamerToBe return } - streamerAccts = append(streamerAccts, &dbstruct.EsStreamerAcct{ - Mid: acct.GetMid(), - Name: acct.GetName(), - UserIdString: acct.GetUserIdString(), - PinYin: strings.Join(pinyin.LazyConvert(acct.GetName(), nil), ""), - Ct: acct.GetCt(), - Ut: acct.GetUt(), - DelFlag: acct.GetDelFlag(), - }) } //2.更新状态 @@ -2231,6 +2224,8 @@ func (s *Service) OpApproveStreamerAuthApprovalDetails(ctx *gin.Context, req *st //3.若是审批通过,将审批表的信息合并进主播表,并更新account表里的角色信息主播 if util.DerefInt64(req.ApproveStatus) == consts.StreamerAuthApprovalDetailsApprove_Passed { + + streamerAccts := make([]*dbstruct.EsStreamerAcct, 0) for _, streamerAuthApprovalDetails := range list { streamer := &dbstruct.Streamer{ Mid: streamerAuthApprovalDetails.Mid, @@ -2250,6 +2245,24 @@ func (s *Service) OpApproveStreamerAuthApprovalDetails(ctx *gin.Context, req *st IsHided: goproto.Int64(0), } + // es搜索引擎数据 + acct := accountMp[streamer.GetMid()] + streamerAccts = append(streamerAccts, &dbstruct.EsStreamerAcct{ + Mid: acct.GetMid(), + Name: acct.GetName(), + UserIdString: acct.GetUserIdString(), + PinYin: strings.Join(pinyin.LazyConvert(acct.GetName(), nil), ""), + Age: streamer.GetAge(), + Height: streamer.GetHeight(), + Weight: streamer.GetWeight(), + City: streamer.GetCity(), + Constellation: streamer.GetConstellation(), + LastZoneMomentCreateDayStart: 0, + Ct: acct.GetCt(), + Ut: acct.GetUt(), + DelFlag: acct.GetDelFlag(), + }) + // 更新user_vas err = _DefaultVas.UpdateWechat(ctx, &vasproto.UpdateWechatReq{ BaseRequest: base.BaseRequest{ @@ -2356,6 +2369,19 @@ func (s *Service) OpUpdateStreamer(ctx *gin.Context, req *streamerproto.OpUpdate ec = errcode.ErrCodeStreamerSrvFail return } + // 更新es + if err := _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: req.Streamer.Mid, + Age: req.Streamer.Age, + Height: req.Streamer.Height, + Weight: req.Streamer.Weight, + City: req.Streamer.City, + Constellation: req.Streamer.Constellation, + }); err != nil { + logger.Error("OpUpdateSelectively fail, req: %v, err: %v", util.ToJson(req), err) + ec = errcode.ErrCodeStreamerSrvFail + return + } return } @@ -4164,11 +4190,23 @@ func (s *Service) OpDeleteZoneMoment(ctx *gin.Context, req *zonemomentproto.OpDe LastZoneMomentCt: goproto.Int64(0), }, }, zone.GetLastZoneMomentCt()) + if err == qmgo.ErrNoSuchDocuments { + return + } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } + err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: zonemoment.Mid, + LastZoneMomentCreateDayStart: goproto.Int64(0), + }) + if err != nil { + logger.Error("OpUpdateSelectively fail, err: %v", err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } } else { //还有动态,看最后一条更新的动态的更新时间是否比删除的那条动态更早,若更早,则需要把空间的更新时间回拨 lastzonemoment := list[0] @@ -4179,11 +4217,23 @@ func (s *Service) OpDeleteZoneMoment(ctx *gin.Context, req *zonemomentproto.OpDe LastZoneMomentCt: lastzonemoment.Ut, }, }, zone.GetLastZoneMomentCt()) + if err == qmgo.ErrNoSuchDocuments { + return + } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } + err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: zonemoment.Mid, + LastZoneMomentCreateDayStart: lastzonemoment.Ut, + }) + if err != nil { + logger.Error("OpUpdateSelectively fail, err: %v", err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } } } } @@ -4301,6 +4351,21 @@ func (s *Service) OpReviewZoneMoment(ctx *gin.Context, req *zonemomentproto.OpRe ec = errcode.ErrCodeZoneMomentSrvFail return } + daystart := util.GetDayStartTimeStamp(time.Unix(time.Now().Unix(), 0)) + mids, err := _DefaultZoneMoment.GetMidsByZoneMomentIds(ctx, req.ZoneMomentIds) + if err != nil { + logger.Error("_DefaultZoneMoment GetMidsByZoneMomentIds fail, req: %v, err: %v", util.ToJson(req), err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } + err = _DefaultStreamerAcct.OpUpdateSelectivelyByMids(ctx, &dbstruct.EsStreamerAcctUpdater{ + LastZoneMomentCreateDayStart: goproto.Int64(daystart), + }, mids) + if err != nil { + logger.Error("_DefaultStreamerAcct OpUpdateSelectivelyByMids fail, req: %v, err: %v", util.ToJson(req), err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } // 增加空间内统计总数 for _, id := range req.ZoneMomentIds { diff --git a/bizcommon/util/util.go b/bizcommon/util/util.go index d8899cbb..1d54396c 100644 --- a/bizcommon/util/util.go +++ b/bizcommon/util/util.go @@ -16,6 +16,7 @@ import ( "time" "unsafe" + "github.com/olivere/elastic/v7" "github.com/qiniu/qmgo" ) @@ -373,3 +374,22 @@ func GetLastLessOrEqualForFloat64(arr []float64, target float64) int { return result } + +type Int64Filter struct { + LowerBound *int64 `json:"lower_bound" bson:"lower_bound"` + UpperBound *int64 `json:"upper_bound" bson:"upper_bound"` +} + +func CreateEsInt64RangeQuery(name string, filter *Int64Filter) *elastic.RangeQuery { + if filter == nil { + return nil + } + query := elastic.NewRangeQuery(name) + if filter.LowerBound != nil { + query.Gte(DerefInt64(filter.LowerBound)) + } + if filter.UpperBound != nil { + query.Lte(DerefInt64(filter.UpperBound)) + } + return query +} diff --git a/dbstruct/streamer.go b/dbstruct/streamer.go index 7441cb78..69907dfc 100644 --- a/dbstruct/streamer.go +++ b/dbstruct/streamer.go @@ -57,3 +57,35 @@ func (p *Streamer) GetAutoResponseMessage() string { } return *p.AutoResponseMessage } + +func (p *Streamer) GetAge() int64 { + if p == nil || p.Age == nil { + return 0 + } + return *p.Age +} + +func (p *Streamer) GetHeight() int64 { + if p == nil || p.Height == nil { + return 0 + } + return *p.Height +} +func (p *Streamer) GetWeight() int64 { + if p == nil || p.Weight == nil { + return 0 + } + return *p.Weight +} +func (p *Streamer) GetConstellation() string { + if p == nil || p.Constellation == nil { + return "" + } + return *p.Constellation +} +func (p *Streamer) GetCity() string { + if p == nil || p.City == nil { + return "" + } + return *p.City +} diff --git a/dbstruct/streamer_acct.go b/dbstruct/streamer_acct.go index 10a1bd9a..de502b27 100644 --- a/dbstruct/streamer_acct.go +++ b/dbstruct/streamer_acct.go @@ -1,11 +1,37 @@ package dbstruct type EsStreamerAcct struct { - Mid int64 `json:"id"` // 用户表Id - Name string `json:"name"` // 用户名 - UserIdString string `json:"user_id_string"` // string型user_id,为模糊匹配设置 - PinYin string `json:"pinyin"` // 拼音 - Ct int64 `json:"ct"` // 创建时间 - Ut int64 `json:"ut"` // 更新时间 - DelFlag int64 `json:"del_flag"` // 删除标记,0-否,1-是 + Mid int64 `json:"id"` // 用户表Id + Name string `json:"name"` // 用户名 + UserIdString string `json:"user_id_string"` // string型user_id,为模糊匹配设置 + PinYin string `json:"pinyin"` // 拼音 + Age int64 `json:"age"` // 年龄 + Height int64 `json:"height"` // 身高 + Weight int64 `json:"weight"` // 体重 + City string `json:"city"` // 所在城市 + Constellation string `json:"constellation"` // 星座 + LastZoneMomentCreateDayStart int64 `json:"last_zone_moment_create_day_start"` // 最后空间动态创建日始整点 + Ct int64 `json:"ct"` // 创建时间 + Ut int64 `json:"ut"` // 更新时间 + DelFlag int64 `json:"del_flag"` // 删除标记,0-否,1-是 +} + +type EsStreamerAcctUpdater struct { + Mid *int64 `json:"id" bson:"_id"` // 用户表Id + Name *string `json:"name" bson:"name"` // 用户名 + UserIdString *string `json:"user_id_string" bson:"user_id_string"` // string型user_id,为模糊匹配设置 + PinYin *string `json:"pinyin" bson:"pinyin"` // 拼音 + Age *int64 `json:"age" bson:"age"` // 年龄 + Height *int64 `json:"height" bson:"height"` // 身高 + Weight *int64 `json:"weight" bson:"weight"` // 体重 + City *string `json:"city" bson:"city"` // 所在城市 + Constellation *string `json:"constellation" bson:"constellation"` // 星座 + LastZoneMomentCreateDayStart *int64 `json:"last_zone_moment_create_day_start" bson:"last_zone_moment_create_day_start"` // 最后空间动态创建日始整点 +} + +func (p *EsStreamerAcctUpdater) GetMid() int64 { + if p == nil || p.Mid == nil { + return 0 + } + return *p.Mid } diff --git a/library/taginterceptor/crypt_test.go b/library/taginterceptor/crypt_test.go index 4f10daf0..6a46bbd8 100644 --- a/library/taginterceptor/crypt_test.go +++ b/library/taginterceptor/crypt_test.go @@ -32,7 +32,7 @@ func Test(t *testing.T) { // fmt.Println(string(phone)) // hash := mycrypto.CryptoServiceInstance().SHA256.Encrypt(phone) // fmt.Println(string(hash)) - mobilePhone := "13728423678" + mobilePhone := "15051772749" rsaBytes, _ := mycrypto.CryptoServiceInstance().AES.Encrypt([]byte(mobilePhone)) base64EncryptedBytes := make([]byte, base64.StdEncoding.EncodedLen(len(rsaBytes))) base64.StdEncoding.Encode(base64EncryptedBytes, rsaBytes)