From b408f1ff1616c5e2304f53c1d9d4e427b799843a Mon Sep 17 00:00:00 2001 From: Robin <7434053+warrior_of_light_robin@user.noreply.gitee.com> Date: Fri, 29 Nov 2024 17:30:12 +0800 Subject: [PATCH] by Robin at 20241129 --- api/consts/consts.go | 1 + api/consts/notif_template.go | 8 +-- apollostruct/n_type_map.go | 5 ++ app/mix/dao/mongo.go | 21 +++++++ app/mix/service/apiservice.go | 29 ++++++--- app/mix/service/logic/account.go | 9 +++ app/mix/service/notif_bcst_center.go | 93 ++++++++++++++++++++++++++++ app/mix/service/service.go | 15 +++++ bizcommon/util/util.go | 52 ++++++++++++++++ 9 files changed, 220 insertions(+), 13 deletions(-) create mode 100644 apollostruct/n_type_map.go diff --git a/api/consts/consts.go b/api/consts/consts.go index 6038b2f3..39af41f8 100644 --- a/api/consts/consts.go +++ b/api/consts/consts.go @@ -72,6 +72,7 @@ const ( OfficialEmailKey = "official_email" InstallInfoKey = "install_info" NotifBannerInfoKey = "notif_banner_info" + NTypeListKey = "n_type_list" AcctPunishmentRealEndTimeKey = "acct_punishment_real_endtime" ) diff --git a/api/consts/notif_template.go b/api/consts/notif_template.go index 22dd3a89..c7827e30 100644 --- a/api/consts/notif_template.go +++ b/api/consts/notif_template.go @@ -1,11 +1,11 @@ package consts // notif_template - const ( - Notif_System = 0 // 系统消息 - Notif_Audit = 1 // 审核消息 - Notif_Vas = 2 // 付费消息 + Notif_System = 0 // 系统消息 + Notif_Audit = 1 // 审核消息 + Notif_Vas = 2 // 付费消息 + Notif_Activity = 3 // 活动消息 ) const ( CtrlNotifTemp_CancelNotif = 1732523321 // 取消推送通知 diff --git a/apollostruct/n_type_map.go b/apollostruct/n_type_map.go new file mode 100644 index 00000000..705d05d9 --- /dev/null +++ b/apollostruct/n_type_map.go @@ -0,0 +1,5 @@ +package apollostruct + +type NTypeMapCfg struct { + Map map[int64]string `json:"map"` +} diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index ee6868b8..3d6a9ca8 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -1580,6 +1580,27 @@ func (m *Mongo) GetLastHourNewUserFromH5Count(ctx *gin.Context, req *accountprot return count, err } +func (m *Mongo) GetStreamerAccountMids(ctx *gin.Context) ([]int64, error) { + col := m.getColAccount() + list := make([]int64, 0) + + query := qmgo.M{ + "role": qmgo.M{ + "$in": []int64{consts.Streamer, consts.StreamerToBe}, + }, + } + + err := col.Find(ctx, query).Sort("_id").Select("_id").All(&list) + if err == qmgo.ErrNoSuchDocuments { + err = nil + return nil, err + } + if err != nil { + return nil, err + } + return list, err +} + func (m *Mongo) CreateAccountHis(ctx *gin.Context, accounts []*dbstruct.Account) error { col := m.getColAccountHis() _, err := col.InsertMany(ctx, accounts) diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 6769590e..35903b29 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -4691,7 +4691,9 @@ func (s *Service) ApiReadNotification(ctx *gin.Context, req *notificationproto.A // 以真更新的条数decr key := util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, req.NType) + totalKey := util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid) _, err = redis.GetRedisClient().DecrBy(key, result.ModifiedCount) + _, err = redis.GetRedisClient().DecrBy(totalKey, result.ModifiedCount) if err != nil { logger.Error("Redis DecrBy fail, err: %v", err) ec = errcode.ErrCodeNotificationSrvFail @@ -4714,7 +4716,9 @@ func (s *Service) ApiReadAllNotification(ctx *gin.Context, req *notificationprot // 以真更新的条数decr key := util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, req.NType) + totalKey := util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid) _, err = redis.GetRedisClient().DecrBy(key, result.ModifiedCount) + _, err = redis.GetRedisClient().DecrBy(totalKey, result.ModifiedCount) if err != nil { logger.Error("Redis DecrBy fail, err: %v", err) ec = errcode.ErrCodeNotificationSrvFail @@ -4724,22 +4728,29 @@ func (s *Service) ApiReadAllNotification(ctx *gin.Context, req *notificationprot return } -func (s *Service) ApiGetNotificationUrcByMid(ctx *gin.Context, req *notificationproto.ApiCountUnreadReq) (mp map[int64]int64, ec errcode.ErrCode) { +func (s *Service) ApiGetNotificationUrcByMid(ctx *gin.Context, req *notificationproto.ApiCountUnreadReq) (mp map[int64]int64, total int64, ec errcode.ErrCode) { ec = errcode.ErrCodeNotificationSrvOk + // 获取apollo配置 + cfg := apollostruct.NTypeMapCfg{} + err := apollo.GetJson("n_type_map", &cfg, apollo.ApolloOpts().SetNamespace("application")) + if err != nil { + logger.Error("Apollo read failed : %v", err) + return make(map[int64]int64), 0, errcode.ErrCodeApolloReadFail + } + // 从redis中读取数据 mp = make(map[int64]int64) if req.NType != nil { nType := util.DerefInt64(req.NType) - total, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, nType)) - mp[nType] = total + stotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, nType)) + mp[nType] = stotal } else { - sysTotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, consts.Notif_System)) - audTotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, consts.Notif_Audit)) - vasTotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, consts.Notif_Vas)) - mp[consts.Notif_System] = sysTotal - mp[consts.Notif_Audit] = audTotal - mp[consts.Notif_Vas] = vasTotal + for k, _ := range cfg.Map { + stotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, k)) + mp[k] = stotal + } + total, _ = redis.GetRedisClient().GetInt64(util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid)) } return diff --git a/app/mix/service/logic/account.go b/app/mix/service/logic/account.go index 571320ec..aec94f1f 100644 --- a/app/mix/service/logic/account.go +++ b/app/mix/service/logic/account.go @@ -267,3 +267,12 @@ func (p *Account) OpListByPhoneHash(ctx *gin.Context, phonehash string) ([]*dbst } return list, nil } + +func (p *Account) GetStreamerAccountMids(ctx *gin.Context) ([]int64, error) { + list, err := p.store.GetStreamerAccountMids(ctx) + if err != nil { + logger.Error("GetStreamerAccountMids fail, err: %v", err) + return make([]int64, 0), err + } + return list, nil +} diff --git a/app/mix/service/notif_bcst_center.go b/app/mix/service/notif_bcst_center.go index 740d55df..ea8be1ad 100644 --- a/app/mix/service/notif_bcst_center.go +++ b/app/mix/service/notif_bcst_center.go @@ -1,10 +1,14 @@ package service import ( + "service/api/adapter/firenzeapi" "service/api/consts" + "service/api/errcode" + firenzeproto "service/api/proto/firenze" "service/bizcommon/util" "service/dbstruct" "service/library/logger" + "service/library/melody" "service/library/redis" "github.com/gin-gonic/gin" @@ -51,6 +55,7 @@ func (s *NotifBcstCenter) BcstNotifs(ctx *gin.Context, nids []int64, objType int } return err } + err = _DefaultNotification.OpUpdateByIds(ctx, &dbstruct.Notification{ Status: goproto.Int64(consts.Notification_Pushed), }, nids) @@ -58,6 +63,9 @@ func (s *NotifBcstCenter) BcstNotifs(ctx *gin.Context, nids []int64, objType int logger.Error("_DefaultNotification OpUpdateByIds fail, err: %v", err) return err } + + // 长链接广播 + return nil } @@ -119,6 +127,7 @@ func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMi for _, mid := range objMids { for nType, total := range nTypeTotalMap { _, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcIdForRedis(mid, nType), total) + _, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(mid), total) if err != nil { logger.Error("Redis IncrBy fail, err: %v", err) return err @@ -222,6 +231,7 @@ func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVer // 记录未读总数 for nType, total := range nTypeTotalMap { _, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcIdForRedis(objMid, nType), total) + _, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(objMid), total) if err != nil { logger.Error("Redis IncrBy fail, err: %v", err) return err @@ -230,3 +240,86 @@ func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVer return nil } + +func (s *NotifBcstCenter) bcstThroughWebsocket(ctx *gin.Context, nids []int64, objType int64, objMids []int64) error { + + // 查询最近nid的消息内容 + notif, err := _DefaultNotification.GetListById(ctx, util.Int64SliceMax(nids)) + if err != nil { + logger.Error("_DefaultNotification GetListById fail, err: %v", err) + return err + } + urc := int64(len(nids)) + + // 根据通知类型进行广播 + switch objType { + case consts.Notification_ObjType_Customized: + return s.bcstToMidsThroughWebsocket(ctx, notif, urc, objMids) + case consts.Notification_ObjType_AllStreamer: + + case consts.Notification_ObjType_AllUser: + + case consts.Notification_ObjType_AllStreamerAndUser: + + default: + return nil + } +} + +func (s *NotifBcstCenter) bcstToMidsThroughWebsocket(ctx *gin.Context, notif *dbstruct.Notification, urc int64, objMids []int64) error { + for _, mid := range objMids { + total, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcTotalIdForRedis(mid)) + param := &firenzeproto.SendBizMsgParam{ + Mid: mid, + } + msg := melody.BizMsg{ + Type: firenzeproto.MsgTypeSysNotifyTab, + Data: map[string]any{ + "unread_cnt": total + urc, + "notif": notif, + }, + } + param.Msg = util.MustMarshal(msg) + resp, err := firenzeapi.SendBizMsg(param) + if err != nil { + logger.Error("SendBizMsg fail, resp: %v, err: %v", util.ToJson(resp), err) + } + if resp.ErrCode != errcode.ErrCodeOk { + logger.Error("SendBizMsg fail, invalid resp, resp: %v, err: %v", util.ToJson(resp), err) + } + } + return nil +} + +func (s *NotifBcstCenter) groupOnlineMids(ctx *gin.Context) (streamerMids []int64, userMids []int64, err error) { + + streamerMids = make([]int64, 0) + userMids = make([]int64, 0) + + // 获取到所有在线用户 + param := &firenzeproto.OnlineMidsParam{} + resp, err := firenzeapi.OnlineMids(param) + if err != nil { + logger.Error("OnlineMids fail, resp: %v, err: %v", util.ToJson(resp), err) + return + } + if resp.ErrCode != errcode.ErrCodeOk { + logger.Error("OnlineMids fail, invalid resp, resp: %v, err: %v", util.ToJson(resp), err) + return + } + + smids := make([]int64, 0) + smids, err = _DefaultAccount.GetStreamerAccountMids(ctx) + if err != nil { + logger.Error("GetStreamerAccountMids fail, resp: %v, err: %v", util.ToJson(resp), err) + return + } + + // 切片 + for i := 0; i < len(resp.Data.Mids); i += 5000 { + upper := util.Int64Min(int64(len(resp.Data.Mids)), int64(i+5000-1)) + mids := resp.Data.Mids[i:upper] + + } + return +} diff --git a/app/mix/service/service.go b/app/mix/service/service.go index e7fa375b..ddd0989e 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -5324,6 +5324,8 @@ func (s *Service) OpReloadNotificationUrc(ctx *gin.Context, req *base.BaseReques if err != nil { logger.Error("OpGetUrc fail, req: %v, err: %v", util.ToJson(req), err) } + + midMp := make(map[int64]*dbstruct.Account) for _, mp := range results { idMp := mp["_id"].(map[string]any) count := mp["count"].(int32) @@ -5334,6 +5336,19 @@ func (s *Service) OpReloadNotificationUrc(ctx *gin.Context, req *base.BaseReques if err != nil { logger.Error("Redis Set fail, err: %v", err) } + + if midMp[mid] == nil { + midMp[mid] = &dbstruct.Account{} + err = redis.GetRedisClient().Set(util.GetNotifUrcTotalIdForRedis(mid), int64(count), 0) + if err != nil { + logger.Error("Redis Set fail, err: %v", err) + } + } else { + _, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(mid), int64(count)) + if err != nil { + logger.Error("Redis Set fail, err: %v", err) + } + } } }() diff --git a/bizcommon/util/util.go b/bizcommon/util/util.go index 3866d983..509fb9f6 100644 --- a/bizcommon/util/util.go +++ b/bizcommon/util/util.go @@ -455,3 +455,55 @@ func GetNotifScene(key string, option int64) int64 { func GetNotifUrcIdForRedis(mid, nType int64) string { return fmt.Sprintf("%surc_%d_%d", consts.RedisNotificationPrefix, mid, nType) } + +func GetNotifUrcTotalIdForRedis(mid int64) string { + return fmt.Sprintf("%surc_%d", consts.RedisNotificationPrefix, mid) +} + +func Int64Max(x, y int64) int64 { + if x > y { + return x + } + return y +} + +func Int64Min(x, y int64) int64 { + if x < y { + return x + } + return y +} + +func Int64SliceMax(s []int64) int64 { + max := int64(math.MinInt64) + for _, v := range s { + max = Int64Max(max, v) + } + return max +} + +func Int64SliceMin(s []int64) int64 { + min := int64(math.MaxInt64) + for _, v := range s { + min = Int64Min(min, v) + } + return min +} + +// 将升序排序后的int64数组切分成含s2元素的和不含s2元素的 +func SplitSortedInt64Slice(bs []int64, rs []int64) ([]int64, []int64) { + + if len(bs) == 0 || len(rs) == 0 { + return bs, make([]int64, 0) + } + + s1 := make([]int64, 0) + s2 := make([]int64, 0) + + rindex := 0 + for _, e := range bs { + if e < rs[rindex] { + continue + } + } +}