Merge branch 'feat-IRONFANS-212-Robin' into conf-212

This commit is contained in:
Robin 2024-12-02 16:02:53 +08:00
commit 8d1229aa79
9 changed files with 278 additions and 15 deletions

View File

@ -72,6 +72,7 @@ const (
SmsSplitRatioKey = "sms_split_ratio"
NotifBannerInfoKey = "notif_banner_info"
InstallInfoKey = "install_info"
NTypeListKey = "n_type_list"
AcctPunishmentRealEndTimeKey = "acct_punishment_real_endtime"
)

View File

@ -1,11 +1,11 @@
package consts
// notif_template
const (
Notif_System = 0 // 系统消息
Notif_Audit = 1 // 审核消息
Notif_Vas = 2 // 付费消息
Notif_System = 0 // 系统消息
Notif_Audit = 1 // 审核消息
Notif_Vas = 2 // 付费消息
Notif_Activity = 3 // 活动消息
)
const (
CtrlNotifTemp_CancelNotif = 1732523321 // 取消推送通知

View File

@ -0,0 +1,5 @@
package apollostruct
type NTypeMapCfg struct {
Map map[int64]string `json:"map"`
}

View File

@ -1601,6 +1601,27 @@ func (m *Mongo) GetLastHourNewUserFromH5Count(ctx *gin.Context, req *accountprot
return count, err
}
func (m *Mongo) GetStreamerAccountMids(ctx *gin.Context) ([]int64, error) {
col := m.getColAccount()
list := make([]int64, 0)
query := qmgo.M{
"role": qmgo.M{
"$in": []int64{consts.Streamer, consts.StreamerToBe},
},
}
err := col.Find(ctx, query).Sort("_id").Select("_id").All(&list)
if err == qmgo.ErrNoSuchDocuments {
err = nil
return nil, err
}
if err != nil {
return nil, err
}
return list, err
}
func (m *Mongo) CreateAccountHis(ctx *gin.Context, accounts []*dbstruct.Account) error {
col := m.getColAccountHis()
_, err := col.InsertMany(ctx, accounts)

View File

@ -4753,12 +4753,19 @@ func (s *Service) ApiReadNotification(ctx *gin.Context, req *notificationproto.A
// 以真更新的条数decr
key := util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, req.NType)
totalKey := util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid)
_, err = redis.GetRedisClient().DecrBy(key, result.ModifiedCount)
if err != nil {
logger.Error("Redis DecrBy fail, err: %v", err)
ec = errcode.ErrCodeNotificationSrvFail
return
}
_, err = redis.GetRedisClient().DecrBy(totalKey, result.ModifiedCount)
if err != nil {
logger.Error("Redis DecrBy fail, err: %v", err)
ec = errcode.ErrCodeNotificationSrvFail
return
}
return
}
@ -4776,32 +4783,46 @@ func (s *Service) ApiReadAllNotification(ctx *gin.Context, req *notificationprot
// 以真更新的条数decr
key := util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, req.NType)
totalKey := util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid)
_, err = redis.GetRedisClient().DecrBy(key, result.ModifiedCount)
if err != nil {
logger.Error("Redis DecrBy fail, err: %v", err)
ec = errcode.ErrCodeNotificationSrvFail
return
}
_, err = redis.GetRedisClient().DecrBy(totalKey, result.ModifiedCount)
if err != nil {
logger.Error("Redis DecrBy fail, err: %v", err)
ec = errcode.ErrCodeNotificationSrvFail
return
}
return
}
func (s *Service) ApiGetNotificationUrcByMid(ctx *gin.Context, req *notificationproto.ApiCountUnreadReq) (mp map[int64]int64, ec errcode.ErrCode) {
func (s *Service) ApiGetNotificationUrcByMid(ctx *gin.Context, req *notificationproto.ApiCountUnreadReq) (mp map[int64]int64, total int64, ec errcode.ErrCode) {
ec = errcode.ErrCodeNotificationSrvOk
// 获取apollo配置
cfg := apollostruct.NTypeMapCfg{}
err := apollo.GetJson("n_type_map", &cfg, apollo.ApolloOpts().SetNamespace("application"))
if err != nil {
logger.Error("Apollo read failed : %v", err)
return make(map[int64]int64), 0, errcode.ErrCodeApolloReadFail
}
// 从redis中读取数据
mp = make(map[int64]int64)
if req.NType != nil {
nType := util.DerefInt64(req.NType)
total, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, nType))
mp[nType] = total
stotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, nType))
mp[nType] = stotal
} else {
sysTotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, consts.Notif_System))
audTotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, consts.Notif_Audit))
vasTotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, consts.Notif_Vas))
mp[consts.Notif_System] = sysTotal
mp[consts.Notif_Audit] = audTotal
mp[consts.Notif_Vas] = vasTotal
for k := range cfg.Map {
stotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, k))
mp[k] = stotal
}
total, _ = redis.GetRedisClient().GetInt64(util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid))
}
return

View File

@ -267,3 +267,12 @@ func (p *Account) OpListByPhoneHash(ctx *gin.Context, phonehash string) ([]*dbst
}
return list, nil
}
func (p *Account) GetStreamerAccountMids(ctx *gin.Context) ([]int64, error) {
list, err := p.store.GetStreamerAccountMids(ctx)
if err != nil {
logger.Error("GetStreamerAccountMids fail, err: %v", err)
return make([]int64, 0), err
}
return list, nil
}

View File

@ -1,10 +1,14 @@
package service
import (
"service/api/adapter/firenzeapi"
"service/api/consts"
"service/api/errcode"
firenzeproto "service/api/proto/firenze"
"service/bizcommon/util"
"service/dbstruct"
"service/library/logger"
"service/library/melody"
"service/library/redis"
"github.com/gin-gonic/gin"
@ -51,6 +55,7 @@ func (s *NotifBcstCenter) BcstNotifs(ctx *gin.Context, nids []int64, objType int
}
return err
}
err = _DefaultNotification.OpUpdateByIds(ctx, &dbstruct.Notification{
Status: goproto.Int64(consts.Notification_Pushed),
}, nids)
@ -58,6 +63,15 @@ func (s *NotifBcstCenter) BcstNotifs(ctx *gin.Context, nids []int64, objType int
logger.Error("_DefaultNotification OpUpdateByIds fail, err: %v", err)
return err
}
// 长链接广播
go func() {
err := s.bcstThroughWebsocket(ctx, nids, objType, objMids)
if err != nil {
logger.Error("bcstThroughWebsocket fail, err: %v", err)
}
}()
return nil
}
@ -123,6 +137,11 @@ func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMi
logger.Error("Redis IncrBy fail, err: %v", err)
return err
}
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(mid), total)
if err != nil {
logger.Error("Redis IncrBy fail, err: %v", err)
return err
}
}
}
@ -226,7 +245,121 @@ func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVer
logger.Error("Redis IncrBy fail, err: %v", err)
return err
}
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(objMid), total)
if err != nil {
logger.Error("Redis IncrBy fail, err: %v", err)
return err
}
}
return nil
}
func (s *NotifBcstCenter) bcstThroughWebsocket(ctx *gin.Context, nids []int64, objType int64, objMids []int64) error {
// 查询最近nid的消息内容
notif, err := _DefaultNotification.GetListById(ctx, util.Int64SliceMax(nids))
if err != nil {
logger.Error("_DefaultNotification GetListById fail, err: %v", err)
return err
}
urc := int64(len(nids))
// 根据通知类型进行广播
switch objType {
case consts.Notification_ObjType_Customized:
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, objMids)
case consts.Notification_ObjType_AllStreamer:
smids, _, err := s.groupOnlineMids(ctx)
if err != nil {
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
return err
}
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, smids)
case consts.Notification_ObjType_AllUser:
_, umids, err := s.groupOnlineMids(ctx)
if err != nil {
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
return err
}
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, umids)
case consts.Notification_ObjType_AllStreamerAndUser:
mids, err := s.getOnlineMids()
if err != nil {
logger.Error("getOnlineMids fail, resp: %v, err: %v", err)
return err
}
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, mids)
default:
return nil
}
}
func (s *NotifBcstCenter) bcstToMidsThroughWebsocket(ctx *gin.Context, notif *dbstruct.Notification, urc int64, objMids []int64) error {
for _, mid := range objMids {
total, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcTotalIdForRedis(mid))
param := &firenzeproto.SendBizMsgParam{
Mid: mid,
}
msg := melody.BizMsg{
Type: firenzeproto.MsgTypeSysNotifyTab,
Data: map[string]any{
"unread_cnt": total + urc,
"notif": notif,
},
}
param.Msg = util.MustMarshal(msg)
resp, err := firenzeapi.SendBizMsg(param)
if err != nil {
logger.Error("SendBizMsg fail, resp: %v, err: %v", util.ToJson(resp), err)
}
if resp.ErrCode != errcode.ErrCodeOk {
logger.Error("SendBizMsg fail, invalid resp, resp: %v, err: %v", util.ToJson(resp), err)
}
}
return nil
}
func (s *NotifBcstCenter) getOnlineMids() (mids []int64, err error) {
// 获取到所有在线用户
param := &firenzeproto.OnlineMidsParam{}
resp, err := firenzeapi.OnlineMids(param)
if err != nil {
logger.Error("OnlineMids fail, resp: %v, err: %v", util.ToJson(resp), err)
return
}
if resp.ErrCode != errcode.ErrCodeOk {
logger.Error("OnlineMids fail, invalid resp, resp: %v, err: %v", util.ToJson(resp), err)
return
}
return resp.Data.Mids, nil
}
func (s *NotifBcstCenter) groupOnlineMids(ctx *gin.Context) (streamerMids []int64, userMids []int64, err error) {
streamerMids = make([]int64, 0)
userMids = make([]int64, 0)
// 获取到所有在线用户
param := &firenzeproto.OnlineMidsParam{}
resp, err := firenzeapi.OnlineMids(param)
if err != nil {
logger.Error("OnlineMids fail, resp: %v, err: %v", util.ToJson(resp), err)
return
}
if resp.ErrCode != errcode.ErrCodeOk {
logger.Error("OnlineMids fail, invalid resp, resp: %v, err: %v", util.ToJson(resp), err)
return
}
var smids []int64
smids, err = _DefaultAccount.GetStreamerAccountMids(ctx)
if err != nil {
logger.Error("GetStreamerAccountMids fail, resp: %v, err: %v", util.ToJson(resp), err)
return
}
// 切分用户和主播
streamerMids, userMids = util.SplitInt64Slice(resp.Data.Mids, smids)
return
}

View File

@ -275,13 +275,12 @@ func (s *Service) Init(c any) (err error) {
_DefaultRavenIQTest = logic.NewRavenIQTest(store)
_DefaultRavenIQTestVisit = logic.NewRavenIQTestVisit(store)
_DefaultHistory = logic.NewHistory(store)
_DefaultRavenIQTestVisit = logic.NewRavenIQTestVisit(store)
_DefaultVeriCodeWrongTimes = logic.NewVeriCodeWrongTimes(store)
_DefaultNotifBcstVers = logic.NewNotifBcstVers(store)
_DefaultNotifBcst = logic.NewNotifBcst(store)
_DefaultNotifReceive = logic.NewNotifReceive(store)
_DefaultNotifReceivePull = logic.NewNotifReceivePull(store)
_DefaultFrontendRoute = logic.NewFrontendRoute(store)
_DefaultVeriCodeWrongTimes = logic.NewVeriCodeWrongTimes(store)
_DefaultVas = logic.NewVas(store, _DefaultStreamer, _DefaultAccount, _DefaultZone, _DefaultZoneThirdPartner, _DefaultZoneCollaborator)
_DefaultStreamerAcct = logic.NewStreamerAcct(store)
@ -5341,6 +5340,8 @@ func (s *Service) OpReloadNotificationUrc(ctx *gin.Context, req *base.BaseReques
if err != nil {
logger.Error("OpGetUrc fail, req: %v, err: %v", util.ToJson(req), err)
}
midMp := make(map[int64]*dbstruct.Account)
for _, mp := range results {
idMp := mp["_id"].(map[string]any)
count := mp["count"].(int32)
@ -5351,6 +5352,19 @@ func (s *Service) OpReloadNotificationUrc(ctx *gin.Context, req *base.BaseReques
if err != nil {
logger.Error("Redis Set fail, err: %v", err)
}
if midMp[mid] == nil {
midMp[mid] = &dbstruct.Account{}
err = redis.GetRedisClient().Set(util.GetNotifUrcTotalIdForRedis(mid), int64(count), 0)
if err != nil {
logger.Error("Redis Set fail, err: %v", err)
}
} else {
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(mid), int64(count))
if err != nil {
logger.Error("Redis Set fail, err: %v", err)
}
}
}
}()

View File

@ -455,3 +455,62 @@ func GetNotifScene(key string, option int64) int64 {
func GetNotifUrcIdForRedis(mid, nType int64) string {
return fmt.Sprintf("%surc_%d_%d", consts.RedisNotificationPrefix, mid, nType)
}
func GetNotifUrcTotalIdForRedis(mid int64) string {
return fmt.Sprintf("%surc_%d", consts.RedisNotificationPrefix, mid)
}
func Int64Max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func Int64Min(x, y int64) int64 {
if x < y {
return x
}
return y
}
func Int64SliceMax(s []int64) int64 {
max := int64(math.MinInt64)
for _, v := range s {
max = Int64Max(max, v)
}
return max
}
func Int64SliceMin(s []int64) int64 {
min := int64(math.MaxInt64)
for _, v := range s {
min = Int64Min(min, v)
}
return min
}
// 将int64数组切分成含s2元素的和不含s2元素的
func SplitInt64Slice(bs []int64, rs []int64) ([]int64, []int64) {
if len(bs) == 0 || len(rs) == 0 {
return bs, make([]int64, 0)
}
s1 := make([]int64, 0)
s2 := make([]int64, 0)
rsMp := make(map[int64]bool)
for _, v := range rs {
rsMp[v] = true
}
for _, bv := range bs {
if rsMp[bv] {
s1 = append(s1, bv)
} else {
s2 = append(s2, bv)
}
}
return s1, s2
}