by Robin at 20241129
This commit is contained in:
parent
93bdb89080
commit
b408f1ff16
|
@ -72,6 +72,7 @@ const (
|
|||
OfficialEmailKey = "official_email"
|
||||
InstallInfoKey = "install_info"
|
||||
NotifBannerInfoKey = "notif_banner_info"
|
||||
NTypeListKey = "n_type_list"
|
||||
AcctPunishmentRealEndTimeKey = "acct_punishment_real_endtime"
|
||||
)
|
||||
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
package consts
|
||||
|
||||
// notif_template
|
||||
|
||||
const (
|
||||
Notif_System = 0 // 系统消息
|
||||
Notif_Audit = 1 // 审核消息
|
||||
Notif_Vas = 2 // 付费消息
|
||||
Notif_System = 0 // 系统消息
|
||||
Notif_Audit = 1 // 审核消息
|
||||
Notif_Vas = 2 // 付费消息
|
||||
Notif_Activity = 3 // 活动消息
|
||||
)
|
||||
const (
|
||||
CtrlNotifTemp_CancelNotif = 1732523321 // 取消推送通知
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
package apollostruct
|
||||
|
||||
type NTypeMapCfg struct {
|
||||
Map map[int64]string `json:"map"`
|
||||
}
|
|
@ -1580,6 +1580,27 @@ func (m *Mongo) GetLastHourNewUserFromH5Count(ctx *gin.Context, req *accountprot
|
|||
return count, err
|
||||
}
|
||||
|
||||
func (m *Mongo) GetStreamerAccountMids(ctx *gin.Context) ([]int64, error) {
|
||||
col := m.getColAccount()
|
||||
list := make([]int64, 0)
|
||||
|
||||
query := qmgo.M{
|
||||
"role": qmgo.M{
|
||||
"$in": []int64{consts.Streamer, consts.StreamerToBe},
|
||||
},
|
||||
}
|
||||
|
||||
err := col.Find(ctx, query).Sort("_id").Select("_id").All(&list)
|
||||
if err == qmgo.ErrNoSuchDocuments {
|
||||
err = nil
|
||||
return nil, err
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return list, err
|
||||
}
|
||||
|
||||
func (m *Mongo) CreateAccountHis(ctx *gin.Context, accounts []*dbstruct.Account) error {
|
||||
col := m.getColAccountHis()
|
||||
_, err := col.InsertMany(ctx, accounts)
|
||||
|
|
|
@ -4691,7 +4691,9 @@ func (s *Service) ApiReadNotification(ctx *gin.Context, req *notificationproto.A
|
|||
|
||||
// 以真更新的条数decr
|
||||
key := util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, req.NType)
|
||||
totalKey := util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid)
|
||||
_, err = redis.GetRedisClient().DecrBy(key, result.ModifiedCount)
|
||||
_, err = redis.GetRedisClient().DecrBy(totalKey, result.ModifiedCount)
|
||||
if err != nil {
|
||||
logger.Error("Redis DecrBy fail, err: %v", err)
|
||||
ec = errcode.ErrCodeNotificationSrvFail
|
||||
|
@ -4714,7 +4716,9 @@ func (s *Service) ApiReadAllNotification(ctx *gin.Context, req *notificationprot
|
|||
|
||||
// 以真更新的条数decr
|
||||
key := util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, req.NType)
|
||||
totalKey := util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid)
|
||||
_, err = redis.GetRedisClient().DecrBy(key, result.ModifiedCount)
|
||||
_, err = redis.GetRedisClient().DecrBy(totalKey, result.ModifiedCount)
|
||||
if err != nil {
|
||||
logger.Error("Redis DecrBy fail, err: %v", err)
|
||||
ec = errcode.ErrCodeNotificationSrvFail
|
||||
|
@ -4724,22 +4728,29 @@ func (s *Service) ApiReadAllNotification(ctx *gin.Context, req *notificationprot
|
|||
return
|
||||
}
|
||||
|
||||
func (s *Service) ApiGetNotificationUrcByMid(ctx *gin.Context, req *notificationproto.ApiCountUnreadReq) (mp map[int64]int64, ec errcode.ErrCode) {
|
||||
func (s *Service) ApiGetNotificationUrcByMid(ctx *gin.Context, req *notificationproto.ApiCountUnreadReq) (mp map[int64]int64, total int64, ec errcode.ErrCode) {
|
||||
ec = errcode.ErrCodeNotificationSrvOk
|
||||
|
||||
// 获取apollo配置
|
||||
cfg := apollostruct.NTypeMapCfg{}
|
||||
err := apollo.GetJson("n_type_map", &cfg, apollo.ApolloOpts().SetNamespace("application"))
|
||||
if err != nil {
|
||||
logger.Error("Apollo read failed : %v", err)
|
||||
return make(map[int64]int64), 0, errcode.ErrCodeApolloReadFail
|
||||
}
|
||||
|
||||
// 从redis中读取数据
|
||||
mp = make(map[int64]int64)
|
||||
if req.NType != nil {
|
||||
nType := util.DerefInt64(req.NType)
|
||||
total, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, nType))
|
||||
mp[nType] = total
|
||||
stotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, nType))
|
||||
mp[nType] = stotal
|
||||
} else {
|
||||
sysTotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, consts.Notif_System))
|
||||
audTotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, consts.Notif_Audit))
|
||||
vasTotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, consts.Notif_Vas))
|
||||
mp[consts.Notif_System] = sysTotal
|
||||
mp[consts.Notif_Audit] = audTotal
|
||||
mp[consts.Notif_Vas] = vasTotal
|
||||
for k, _ := range cfg.Map {
|
||||
stotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, k))
|
||||
mp[k] = stotal
|
||||
}
|
||||
total, _ = redis.GetRedisClient().GetInt64(util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid))
|
||||
}
|
||||
|
||||
return
|
||||
|
|
|
@ -267,3 +267,12 @@ func (p *Account) OpListByPhoneHash(ctx *gin.Context, phonehash string) ([]*dbst
|
|||
}
|
||||
return list, nil
|
||||
}
|
||||
|
||||
func (p *Account) GetStreamerAccountMids(ctx *gin.Context) ([]int64, error) {
|
||||
list, err := p.store.GetStreamerAccountMids(ctx)
|
||||
if err != nil {
|
||||
logger.Error("GetStreamerAccountMids fail, err: %v", err)
|
||||
return make([]int64, 0), err
|
||||
}
|
||||
return list, nil
|
||||
}
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"service/api/adapter/firenzeapi"
|
||||
"service/api/consts"
|
||||
"service/api/errcode"
|
||||
firenzeproto "service/api/proto/firenze"
|
||||
"service/bizcommon/util"
|
||||
"service/dbstruct"
|
||||
"service/library/logger"
|
||||
"service/library/melody"
|
||||
"service/library/redis"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
|
@ -51,6 +55,7 @@ func (s *NotifBcstCenter) BcstNotifs(ctx *gin.Context, nids []int64, objType int
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
err = _DefaultNotification.OpUpdateByIds(ctx, &dbstruct.Notification{
|
||||
Status: goproto.Int64(consts.Notification_Pushed),
|
||||
}, nids)
|
||||
|
@ -58,6 +63,9 @@ func (s *NotifBcstCenter) BcstNotifs(ctx *gin.Context, nids []int64, objType int
|
|||
logger.Error("_DefaultNotification OpUpdateByIds fail, err: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 长链接广播
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -119,6 +127,7 @@ func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMi
|
|||
for _, mid := range objMids {
|
||||
for nType, total := range nTypeTotalMap {
|
||||
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcIdForRedis(mid, nType), total)
|
||||
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(mid), total)
|
||||
if err != nil {
|
||||
logger.Error("Redis IncrBy fail, err: %v", err)
|
||||
return err
|
||||
|
@ -222,6 +231,7 @@ func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVer
|
|||
// 记录未读总数
|
||||
for nType, total := range nTypeTotalMap {
|
||||
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcIdForRedis(objMid, nType), total)
|
||||
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(objMid), total)
|
||||
if err != nil {
|
||||
logger.Error("Redis IncrBy fail, err: %v", err)
|
||||
return err
|
||||
|
@ -230,3 +240,86 @@ func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVer
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *NotifBcstCenter) bcstThroughWebsocket(ctx *gin.Context, nids []int64, objType int64, objMids []int64) error {
|
||||
|
||||
// 查询最近nid的消息内容
|
||||
notif, err := _DefaultNotification.GetListById(ctx, util.Int64SliceMax(nids))
|
||||
if err != nil {
|
||||
logger.Error("_DefaultNotification GetListById fail, err: %v", err)
|
||||
return err
|
||||
}
|
||||
urc := int64(len(nids))
|
||||
|
||||
// 根据通知类型进行广播
|
||||
switch objType {
|
||||
case consts.Notification_ObjType_Customized:
|
||||
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, objMids)
|
||||
case consts.Notification_ObjType_AllStreamer:
|
||||
|
||||
case consts.Notification_ObjType_AllUser:
|
||||
|
||||
case consts.Notification_ObjType_AllStreamerAndUser:
|
||||
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *NotifBcstCenter) bcstToMidsThroughWebsocket(ctx *gin.Context, notif *dbstruct.Notification, urc int64, objMids []int64) error {
|
||||
for _, mid := range objMids {
|
||||
total, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcTotalIdForRedis(mid))
|
||||
param := &firenzeproto.SendBizMsgParam{
|
||||
Mid: mid,
|
||||
}
|
||||
msg := melody.BizMsg{
|
||||
Type: firenzeproto.MsgTypeSysNotifyTab,
|
||||
Data: map[string]any{
|
||||
"unread_cnt": total + urc,
|
||||
"notif": notif,
|
||||
},
|
||||
}
|
||||
param.Msg = util.MustMarshal(msg)
|
||||
resp, err := firenzeapi.SendBizMsg(param)
|
||||
if err != nil {
|
||||
logger.Error("SendBizMsg fail, resp: %v, err: %v", util.ToJson(resp), err)
|
||||
}
|
||||
if resp.ErrCode != errcode.ErrCodeOk {
|
||||
logger.Error("SendBizMsg fail, invalid resp, resp: %v, err: %v", util.ToJson(resp), err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *NotifBcstCenter) groupOnlineMids(ctx *gin.Context) (streamerMids []int64, userMids []int64, err error) {
|
||||
|
||||
streamerMids = make([]int64, 0)
|
||||
userMids = make([]int64, 0)
|
||||
|
||||
// 获取到所有在线用户
|
||||
param := &firenzeproto.OnlineMidsParam{}
|
||||
resp, err := firenzeapi.OnlineMids(param)
|
||||
if err != nil {
|
||||
logger.Error("OnlineMids fail, resp: %v, err: %v", util.ToJson(resp), err)
|
||||
return
|
||||
}
|
||||
if resp.ErrCode != errcode.ErrCodeOk {
|
||||
logger.Error("OnlineMids fail, invalid resp, resp: %v, err: %v", util.ToJson(resp), err)
|
||||
return
|
||||
}
|
||||
|
||||
smids := make([]int64, 0)
|
||||
smids, err = _DefaultAccount.GetStreamerAccountMids(ctx)
|
||||
if err != nil {
|
||||
logger.Error("GetStreamerAccountMids fail, resp: %v, err: %v", util.ToJson(resp), err)
|
||||
return
|
||||
}
|
||||
|
||||
// 切片
|
||||
for i := 0; i < len(resp.Data.Mids); i += 5000 {
|
||||
upper := util.Int64Min(int64(len(resp.Data.Mids)), int64(i+5000-1))
|
||||
mids := resp.Data.Mids[i:upper]
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -5324,6 +5324,8 @@ func (s *Service) OpReloadNotificationUrc(ctx *gin.Context, req *base.BaseReques
|
|||
if err != nil {
|
||||
logger.Error("OpGetUrc fail, req: %v, err: %v", util.ToJson(req), err)
|
||||
}
|
||||
|
||||
midMp := make(map[int64]*dbstruct.Account)
|
||||
for _, mp := range results {
|
||||
idMp := mp["_id"].(map[string]any)
|
||||
count := mp["count"].(int32)
|
||||
|
@ -5334,6 +5336,19 @@ func (s *Service) OpReloadNotificationUrc(ctx *gin.Context, req *base.BaseReques
|
|||
if err != nil {
|
||||
logger.Error("Redis Set fail, err: %v", err)
|
||||
}
|
||||
|
||||
if midMp[mid] == nil {
|
||||
midMp[mid] = &dbstruct.Account{}
|
||||
err = redis.GetRedisClient().Set(util.GetNotifUrcTotalIdForRedis(mid), int64(count), 0)
|
||||
if err != nil {
|
||||
logger.Error("Redis Set fail, err: %v", err)
|
||||
}
|
||||
} else {
|
||||
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(mid), int64(count))
|
||||
if err != nil {
|
||||
logger.Error("Redis Set fail, err: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
@ -455,3 +455,55 @@ func GetNotifScene(key string, option int64) int64 {
|
|||
func GetNotifUrcIdForRedis(mid, nType int64) string {
|
||||
return fmt.Sprintf("%surc_%d_%d", consts.RedisNotificationPrefix, mid, nType)
|
||||
}
|
||||
|
||||
func GetNotifUrcTotalIdForRedis(mid int64) string {
|
||||
return fmt.Sprintf("%surc_%d", consts.RedisNotificationPrefix, mid)
|
||||
}
|
||||
|
||||
func Int64Max(x, y int64) int64 {
|
||||
if x > y {
|
||||
return x
|
||||
}
|
||||
return y
|
||||
}
|
||||
|
||||
func Int64Min(x, y int64) int64 {
|
||||
if x < y {
|
||||
return x
|
||||
}
|
||||
return y
|
||||
}
|
||||
|
||||
func Int64SliceMax(s []int64) int64 {
|
||||
max := int64(math.MinInt64)
|
||||
for _, v := range s {
|
||||
max = Int64Max(max, v)
|
||||
}
|
||||
return max
|
||||
}
|
||||
|
||||
func Int64SliceMin(s []int64) int64 {
|
||||
min := int64(math.MaxInt64)
|
||||
for _, v := range s {
|
||||
min = Int64Min(min, v)
|
||||
}
|
||||
return min
|
||||
}
|
||||
|
||||
// 将升序排序后的int64数组切分成含s2元素的和不含s2元素的
|
||||
func SplitSortedInt64Slice(bs []int64, rs []int64) ([]int64, []int64) {
|
||||
|
||||
if len(bs) == 0 || len(rs) == 0 {
|
||||
return bs, make([]int64, 0)
|
||||
}
|
||||
|
||||
s1 := make([]int64, 0)
|
||||
s2 := make([]int64, 0)
|
||||
|
||||
rindex := 0
|
||||
for _, e := range bs {
|
||||
if e < rs[rindex] {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue