by Robin at 20241216

This commit is contained in:
Robin 2024-12-16 17:38:14 +08:00
parent a5db8148b7
commit f73b7711da
3 changed files with 115 additions and 74 deletions

View File

@ -104,49 +104,39 @@ func (s *NotifBcstCenter) bcstNotifsToAll(ctx *gin.Context, nids []int64, objTyp
// 直接推送至用户
func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMids []int64) error {
// 查询得到消息
notifMap := make(map[int64]*dbstruct.Notification)
notifs, err := _DefaultNotification.GetListByIds(ctx, nids)
// 查询得到信息
info, err := s.extractNotifInfo(ctx, nids)
if err != nil {
logger.Error("GetNotificationListByIds fail, err: %v", err)
logger.Error("extractNotifInfo fail, err: %v", err)
return err
}
for _, notif := range notifs {
notifMap[notif.GetId()] = notif
}
notifReceives := make([]*dbstruct.NotifReceive, 0)
midRecentReceiveMap := make(map[int64]map[int64]*dbstruct.NotifRecentReceive) // 最近的一条通知nid
for _, mid := range objMids {
recentReceiveMap := make(map[int64]*dbstruct.NotifRecentReceive)
for _, nid := range nids {
notifReceive := &dbstruct.NotifReceive{
ObjMid: mid,
Nid: nid,
NType: notifMap[nid].GetNType(),
IsRead: consts.NotifReceive_NotRead,
Ct: notifMap[nid].GetRt(),
}
notifReceives = append(notifReceives, notifReceive)
s.tryToReplaceRecentReceive(recentReceiveMap, notifReceive)
}
midRecentReceiveMap[mid] = recentReceiveMap
}
logger.Info("trying to update recent_receive...")
logger.Info("mid_recent_receive_map: %v", midRecentReceiveMap)
// 尝试更新最近接收消息记录
for mid, recentReceiveMap := range midRecentReceiveMap {
logger.Info("recent_receive_map: %v", recentReceiveMap)
for nType, recentReceive := range recentReceiveMap {
for _, mid := range objMids {
// 复制信息
for _, v := range info.NotifReceives {
notifReceive := v.Copy()
notifReceive.ObjMid = mid
notifReceives = append(notifReceives, notifReceive)
}
// 尝试更新最近接收消息记录
for nType, recentReceive := range info.RecentReceiveMap {
id := util.GetNotifRecentReceiveId(mid, nType)
logger.Info("trying to update recent_receive for %v to %v", id, recentReceive)
_, err := _DefaultNotifRecentReceive.GetAndUpdateNotifRecentReceive(ctx, id, recentReceive)
if err != nil {
logger.Error("GetAndUpdateNotifRecentReceive fail, err: %v", err)
return err
}
}
// 新增未读数
for nType, incr := range info.UrcMap {
err := DefaultService.utilIncrUrc(mid, nType, incr)
if err != nil {
logger.Error("utilIncrUrc fail, err: %v", err)
return err
}
}
}
err = _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives)
@ -158,6 +148,44 @@ func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMi
return nil
}
type ExtractedNotifReceiveInfo struct {
NotifReceives []*dbstruct.NotifReceive // 接收记录
RecentReceiveMap map[int64]*dbstruct.NotifRecentReceive // 最近接收消息map从notification生成
UrcMap map[int64]int64 // 需要自增的map从notification生成
}
// 提取消息数组的信息
func (s *NotifBcstCenter) extractNotifInfo(ctx *gin.Context, nids []int64) (info *ExtractedNotifReceiveInfo, recenterr error) {
info = &ExtractedNotifReceiveInfo{
NotifReceives: make([]*dbstruct.NotifReceive, 0),
RecentReceiveMap: make(map[int64]*dbstruct.NotifRecentReceive),
UrcMap: make(map[int64]int64),
}
// 查询得到消息
notifs, err := _DefaultNotification.GetListByIds(ctx, nids)
if err != nil {
logger.Error("GetNotificationListByIds fail, err: %v", err)
return info, err
}
for _, notif := range notifs {
// 组装接收记录
notifReceive := &dbstruct.NotifReceive{
Nid: notif.GetId(),
NType: notif.GetNType(),
IsRead: consts.NotifReceive_NotRead,
Ct: notif.GetRt(),
}
info.NotifReceives = append(info.NotifReceives, notifReceive)
s.tryToReplaceRecentReceive(info.RecentReceiveMap, notifReceive)
info.UrcMap[notif.GetNType()]++
}
return
}
// 主播获取全局广播消息
func (s *NotifBcstCenter) ReceiveAllBcstedNotifs(ctx *gin.Context, mid, objType int64) error {
// 获取当前主播/用户全局广播的版本号(注意这个版本号-1才是当前数据库有的最新版本号
@ -214,51 +242,45 @@ func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVer
logger.Error("OpListByVersRange fail, err: %v", err)
return err
}
// 初始化接收表记录
notifReceives := make([]*dbstruct.NotifReceive, 0)
notifReceiveMap := make(map[int64]*dbstruct.NotifReceive)
// 获取nids
nids := make([]int64, 0)
// 填充接收表记录
for _, notifBcst := range notifBcsts {
for _, nid := range notifBcst.Nids {
notifReceive := &dbstruct.NotifReceive{
ObjMid: objMid,
Nid: nid,
IsRead: consts.NotifReceive_NotRead,
}
notifReceives = append(notifReceives, notifReceive)
notifReceiveMap[nid] = notifReceive
nids = append(nids, nid)
}
nids = append(nids, notifBcst.Nids...)
}
// 初始化最近接收记录信息
recentReceiveMap := make(map[int64]*dbstruct.NotifRecentReceive) // 最近的一条通知nid
// 查询通知详细信息
notifs, err := _DefaultNotification.GetListByIds(ctx, nids)
// 查询得到接收表信息
info, err := s.extractNotifInfo(ctx, nids)
if err != nil {
logger.Error("GetNotificationListByIds fail, err: %v", err)
logger.Error("extractNotifInfo fail, err: %v", err)
return err
}
// 补足接收表的接收时间和消息类型
for _, notif := range notifs {
notifReceive, ok := notifReceiveMap[notif.GetId()]
if ok {
notifReceive.NType = notif.GetNType()
notifReceive.Ct = notif.GetRt()
s.tryToReplaceRecentReceive(recentReceiveMap, notifReceive)
}
notifReceives := make([]*dbstruct.NotifReceive, 0)
// 复制信息
for _, v := range info.NotifReceives {
notifReceive := v.Copy()
notifReceive.ObjMid = objMid
notifReceives = append(notifReceives, notifReceive)
}
// 尝试更新最近接收消息记录
logger.Info("recent_receive_map: %v", recentReceiveMap)
for nType, recentReceive := range recentReceiveMap {
for nType, recentReceive := range info.RecentReceiveMap {
id := util.GetNotifRecentReceiveId(objMid, nType)
logger.Info("trying to update recent_receive for %v to %v", id, recentReceive)
_, err := _DefaultNotifRecentReceive.GetAndUpdateNotifRecentReceive(ctx, id, recentReceive)
if err != nil {
logger.Error("GetAndUpdateNotifRecentReceive fail, err: %v", err)
return err
}
}
// 新增未读数
for nType, incr := range info.UrcMap {
err := DefaultService.utilIncrUrc(objMid, nType, incr)
if err != nil {
logger.Error("utilIncrUrc fail, err: %v", err)
return err
}
}
err = _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives)
if err != nil {
@ -277,49 +299,55 @@ func (s *NotifBcstCenter) bcstThroughWebsocket(ctx *gin.Context, nids []int64, o
logger.Error("_DefaultNotification GetListById fail, err: %v", err)
return err
}
urc := int64(len(nids))
// 根据通知类型进行广播
switch objType {
case consts.Notification_ObjType_Customized:
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, objMids)
return s.bcstToMidsThroughWebsocket(ctx, notif, objType, objMids)
case consts.Notification_ObjType_AllStreamer:
smids, _, err := s.groupOnlineMids(ctx)
if err != nil {
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
return err
}
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, smids)
return s.bcstToMidsThroughWebsocket(ctx, notif, objType, smids)
case consts.Notification_ObjType_AllUser:
_, umids, err := s.groupOnlineMids(ctx)
if err != nil {
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
return err
}
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, umids)
return s.bcstToMidsThroughWebsocket(ctx, notif, objType, umids)
case consts.Notification_ObjType_AllStreamerAndUser:
mids, err := s.getOnlineMids()
smids, umids, err := s.groupOnlineMids(ctx)
if err != nil {
logger.Error("getOnlineMids fail, resp: %v, err: %v", err)
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
return err
}
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, mids)
err = s.bcstToMidsThroughWebsocket(ctx, notif, consts.Notification_ObjType_AllStreamer, smids)
if err != nil {
logger.Error("bcstToMidsThroughWebsocket fail, resp: %v, err: %v", err)
return err
}
return s.bcstToMidsThroughWebsocket(ctx, notif, consts.Notification_ObjType_AllUser, umids)
default:
return nil
}
}
func (s *NotifBcstCenter) bcstToMidsThroughWebsocket(ctx *gin.Context, notif *dbstruct.Notification, urc int64, objMids []int64) error {
func (s *NotifBcstCenter) bcstToMidsThroughWebsocket(ctx *gin.Context, notif *dbstruct.Notification, objType int64, objMids []int64) error {
for _, mid := range objMids {
// 先更新未读总数
err := DefaultService.utilIncrUrc(mid, notif.GetNType(), urc)
if err != nil {
logger.Error("utilIncrUrc fail, err: %v", err)
return err
// 若是广播消息,则在客户端进行一次消息的接收
if objType != consts.Notification_ObjType_Customized {
err := s.ReceiveAllBcstedNotifs(ctx, mid, objType)
if err != nil {
logger.Error("ReceiveAllBcstedNotifs failed, mid: %v, objType: %v, err: %v", mid, objType, err)
}
}
total, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcTotalIdForRedis(mid))
param := &firenzeproto.SendBizMsgParam{
Mid: mid,
}

View File

@ -2143,7 +2143,7 @@ func (s *Service) OpApproveStreamerAuthApprovalBasic(ctx *gin.Context, req *stre
}
// 发送通知
DefaultNotifBuilderHandler.Handle(ctx)(consts.AudNotifTemp_StreamerBasicInfoPassed)(midList)
DefaultNotifBuilderHandler.Handle(ctx)(consts.AudNotifTemp_StreamerBasicInfoPassed, consts.CtrlNotifTemp_SyncNotifBcstVersForStreamer)(midList)
}
return

View File

@ -109,6 +109,19 @@ type NotifReceive struct {
DelFlag int64 `json:"del_flag" bson:"del_flag"` // 删除标记
}
func (p *NotifReceive) Copy() *NotifReceive {
return &NotifReceive{
Id: p.Id,
ObjMid: p.ObjMid,
Nid: p.Nid,
NType: p.NType,
IsRead: p.IsRead,
Ct: p.Ct,
Ut: p.Ut,
DelFlag: p.DelFlag,
}
}
type NotifReceivePull struct {
Id int64 `json:"id" bson:"_id"` // 通知接收拉取表id
ObjMid int64 `json:"obj_mid" bson:"obj_mid"` // 通知接收人mid