Merge pull request 'by Robin at 20241216' (#906) from feat-IRONFANS-212-Robin into test
Reviewed-on: https://git.wishpal.cn/wishpal_ironfan/service/pulls/906
This commit is contained in:
commit
8a1181469e
|
@ -104,49 +104,39 @@ func (s *NotifBcstCenter) bcstNotifsToAll(ctx *gin.Context, nids []int64, objTyp
|
||||||
// 直接推送至用户
|
// 直接推送至用户
|
||||||
func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMids []int64) error {
|
func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMids []int64) error {
|
||||||
|
|
||||||
// 查询得到消息
|
// 查询得到信息
|
||||||
notifMap := make(map[int64]*dbstruct.Notification)
|
info, err := s.extractNotifInfo(ctx, nids)
|
||||||
notifs, err := _DefaultNotification.GetListByIds(ctx, nids)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("GetNotificationListByIds fail, err: %v", err)
|
logger.Error("extractNotifInfo fail, err: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, notif := range notifs {
|
|
||||||
notifMap[notif.GetId()] = notif
|
|
||||||
}
|
|
||||||
|
|
||||||
notifReceives := make([]*dbstruct.NotifReceive, 0)
|
notifReceives := make([]*dbstruct.NotifReceive, 0)
|
||||||
midRecentReceiveMap := make(map[int64]map[int64]*dbstruct.NotifRecentReceive) // 最近的一条通知nid
|
|
||||||
for _, mid := range objMids {
|
|
||||||
recentReceiveMap := make(map[int64]*dbstruct.NotifRecentReceive)
|
|
||||||
for _, nid := range nids {
|
|
||||||
notifReceive := &dbstruct.NotifReceive{
|
|
||||||
ObjMid: mid,
|
|
||||||
Nid: nid,
|
|
||||||
NType: notifMap[nid].GetNType(),
|
|
||||||
IsRead: consts.NotifReceive_NotRead,
|
|
||||||
Ct: notifMap[nid].GetRt(),
|
|
||||||
}
|
|
||||||
notifReceives = append(notifReceives, notifReceive)
|
|
||||||
s.tryToReplaceRecentReceive(recentReceiveMap, notifReceive)
|
|
||||||
}
|
|
||||||
midRecentReceiveMap[mid] = recentReceiveMap
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("trying to update recent_receive...")
|
for _, mid := range objMids {
|
||||||
logger.Info("mid_recent_receive_map: %v", midRecentReceiveMap)
|
// 复制信息
|
||||||
// 尝试更新最近接收消息记录
|
for _, v := range info.NotifReceives {
|
||||||
for mid, recentReceiveMap := range midRecentReceiveMap {
|
notifReceive := v.Copy()
|
||||||
logger.Info("recent_receive_map: %v", recentReceiveMap)
|
notifReceive.ObjMid = mid
|
||||||
for nType, recentReceive := range recentReceiveMap {
|
notifReceives = append(notifReceives, notifReceive)
|
||||||
|
}
|
||||||
|
// 尝试更新最近接收消息记录
|
||||||
|
for nType, recentReceive := range info.RecentReceiveMap {
|
||||||
id := util.GetNotifRecentReceiveId(mid, nType)
|
id := util.GetNotifRecentReceiveId(mid, nType)
|
||||||
logger.Info("trying to update recent_receive for %v to %v", id, recentReceive)
|
|
||||||
_, err := _DefaultNotifRecentReceive.GetAndUpdateNotifRecentReceive(ctx, id, recentReceive)
|
_, err := _DefaultNotifRecentReceive.GetAndUpdateNotifRecentReceive(ctx, id, recentReceive)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("GetAndUpdateNotifRecentReceive fail, err: %v", err)
|
logger.Error("GetAndUpdateNotifRecentReceive fail, err: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// 新增未读数
|
||||||
|
for nType, incr := range info.UrcMap {
|
||||||
|
err := DefaultService.utilIncrUrc(mid, nType, incr)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("utilIncrUrc fail, err: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives)
|
err = _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives)
|
||||||
|
@ -158,6 +148,44 @@ func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMi
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ExtractedNotifReceiveInfo struct {
|
||||||
|
NotifReceives []*dbstruct.NotifReceive // 接收记录
|
||||||
|
RecentReceiveMap map[int64]*dbstruct.NotifRecentReceive // 最近接收消息map,从notification生成
|
||||||
|
UrcMap map[int64]int64 // 需要自增的map,从notification生成
|
||||||
|
}
|
||||||
|
|
||||||
|
// 提取消息数组的信息
|
||||||
|
func (s *NotifBcstCenter) extractNotifInfo(ctx *gin.Context, nids []int64) (info *ExtractedNotifReceiveInfo, recenterr error) {
|
||||||
|
|
||||||
|
info = &ExtractedNotifReceiveInfo{
|
||||||
|
NotifReceives: make([]*dbstruct.NotifReceive, 0),
|
||||||
|
RecentReceiveMap: make(map[int64]*dbstruct.NotifRecentReceive),
|
||||||
|
UrcMap: make(map[int64]int64),
|
||||||
|
}
|
||||||
|
|
||||||
|
// 查询得到消息
|
||||||
|
notifs, err := _DefaultNotification.GetListByIds(ctx, nids)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("GetNotificationListByIds fail, err: %v", err)
|
||||||
|
return info, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, notif := range notifs {
|
||||||
|
// 组装接收记录
|
||||||
|
notifReceive := &dbstruct.NotifReceive{
|
||||||
|
Nid: notif.GetId(),
|
||||||
|
NType: notif.GetNType(),
|
||||||
|
IsRead: consts.NotifReceive_NotRead,
|
||||||
|
Ct: notif.GetRt(),
|
||||||
|
}
|
||||||
|
info.NotifReceives = append(info.NotifReceives, notifReceive)
|
||||||
|
s.tryToReplaceRecentReceive(info.RecentReceiveMap, notifReceive)
|
||||||
|
info.UrcMap[notif.GetNType()]++
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// 主播获取全局广播消息
|
// 主播获取全局广播消息
|
||||||
func (s *NotifBcstCenter) ReceiveAllBcstedNotifs(ctx *gin.Context, mid, objType int64) error {
|
func (s *NotifBcstCenter) ReceiveAllBcstedNotifs(ctx *gin.Context, mid, objType int64) error {
|
||||||
// 获取当前主播/用户全局广播的版本号(注意这个版本号-1才是当前数据库有的最新版本号)
|
// 获取当前主播/用户全局广播的版本号(注意这个版本号-1才是当前数据库有的最新版本号)
|
||||||
|
@ -214,51 +242,45 @@ func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVer
|
||||||
logger.Error("OpListByVersRange fail, err: %v", err)
|
logger.Error("OpListByVersRange fail, err: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// 初始化接收表记录
|
|
||||||
notifReceives := make([]*dbstruct.NotifReceive, 0)
|
// 获取nids
|
||||||
notifReceiveMap := make(map[int64]*dbstruct.NotifReceive)
|
|
||||||
nids := make([]int64, 0)
|
nids := make([]int64, 0)
|
||||||
// 填充接收表记录
|
|
||||||
for _, notifBcst := range notifBcsts {
|
for _, notifBcst := range notifBcsts {
|
||||||
for _, nid := range notifBcst.Nids {
|
nids = append(nids, notifBcst.Nids...)
|
||||||
notifReceive := &dbstruct.NotifReceive{
|
|
||||||
ObjMid: objMid,
|
|
||||||
Nid: nid,
|
|
||||||
IsRead: consts.NotifReceive_NotRead,
|
|
||||||
}
|
|
||||||
notifReceives = append(notifReceives, notifReceive)
|
|
||||||
notifReceiveMap[nid] = notifReceive
|
|
||||||
nids = append(nids, nid)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// 初始化最近接收记录信息
|
|
||||||
recentReceiveMap := make(map[int64]*dbstruct.NotifRecentReceive) // 最近的一条通知nid
|
// 查询得到接收表信息
|
||||||
// 查询通知详细信息
|
info, err := s.extractNotifInfo(ctx, nids)
|
||||||
notifs, err := _DefaultNotification.GetListByIds(ctx, nids)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("GetNotificationListByIds fail, err: %v", err)
|
logger.Error("extractNotifInfo fail, err: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// 补足接收表的接收时间和消息类型
|
|
||||||
for _, notif := range notifs {
|
notifReceives := make([]*dbstruct.NotifReceive, 0)
|
||||||
notifReceive, ok := notifReceiveMap[notif.GetId()]
|
|
||||||
if ok {
|
// 复制信息
|
||||||
notifReceive.NType = notif.GetNType()
|
for _, v := range info.NotifReceives {
|
||||||
notifReceive.Ct = notif.GetRt()
|
notifReceive := v.Copy()
|
||||||
s.tryToReplaceRecentReceive(recentReceiveMap, notifReceive)
|
notifReceive.ObjMid = objMid
|
||||||
}
|
notifReceives = append(notifReceives, notifReceive)
|
||||||
}
|
}
|
||||||
// 尝试更新最近接收消息记录
|
// 尝试更新最近接收消息记录
|
||||||
logger.Info("recent_receive_map: %v", recentReceiveMap)
|
for nType, recentReceive := range info.RecentReceiveMap {
|
||||||
for nType, recentReceive := range recentReceiveMap {
|
|
||||||
id := util.GetNotifRecentReceiveId(objMid, nType)
|
id := util.GetNotifRecentReceiveId(objMid, nType)
|
||||||
logger.Info("trying to update recent_receive for %v to %v", id, recentReceive)
|
|
||||||
_, err := _DefaultNotifRecentReceive.GetAndUpdateNotifRecentReceive(ctx, id, recentReceive)
|
_, err := _DefaultNotifRecentReceive.GetAndUpdateNotifRecentReceive(ctx, id, recentReceive)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("GetAndUpdateNotifRecentReceive fail, err: %v", err)
|
logger.Error("GetAndUpdateNotifRecentReceive fail, err: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// 新增未读数
|
||||||
|
for nType, incr := range info.UrcMap {
|
||||||
|
err := DefaultService.utilIncrUrc(objMid, nType, incr)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("utilIncrUrc fail, err: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives)
|
err = _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -277,49 +299,55 @@ func (s *NotifBcstCenter) bcstThroughWebsocket(ctx *gin.Context, nids []int64, o
|
||||||
logger.Error("_DefaultNotification GetListById fail, err: %v", err)
|
logger.Error("_DefaultNotification GetListById fail, err: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
urc := int64(len(nids))
|
|
||||||
|
|
||||||
// 根据通知类型进行广播
|
// 根据通知类型进行广播
|
||||||
switch objType {
|
switch objType {
|
||||||
case consts.Notification_ObjType_Customized:
|
case consts.Notification_ObjType_Customized:
|
||||||
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, objMids)
|
return s.bcstToMidsThroughWebsocket(ctx, notif, objType, objMids)
|
||||||
case consts.Notification_ObjType_AllStreamer:
|
case consts.Notification_ObjType_AllStreamer:
|
||||||
smids, _, err := s.groupOnlineMids(ctx)
|
smids, _, err := s.groupOnlineMids(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
|
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, smids)
|
return s.bcstToMidsThroughWebsocket(ctx, notif, objType, smids)
|
||||||
case consts.Notification_ObjType_AllUser:
|
case consts.Notification_ObjType_AllUser:
|
||||||
_, umids, err := s.groupOnlineMids(ctx)
|
_, umids, err := s.groupOnlineMids(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
|
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, umids)
|
return s.bcstToMidsThroughWebsocket(ctx, notif, objType, umids)
|
||||||
case consts.Notification_ObjType_AllStreamerAndUser:
|
case consts.Notification_ObjType_AllStreamerAndUser:
|
||||||
mids, err := s.getOnlineMids()
|
smids, umids, err := s.groupOnlineMids(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("getOnlineMids fail, resp: %v, err: %v", err)
|
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, mids)
|
err = s.bcstToMidsThroughWebsocket(ctx, notif, consts.Notification_ObjType_AllStreamer, smids)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("bcstToMidsThroughWebsocket fail, resp: %v, err: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.bcstToMidsThroughWebsocket(ctx, notif, consts.Notification_ObjType_AllUser, umids)
|
||||||
default:
|
default:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *NotifBcstCenter) bcstToMidsThroughWebsocket(ctx *gin.Context, notif *dbstruct.Notification, urc int64, objMids []int64) error {
|
func (s *NotifBcstCenter) bcstToMidsThroughWebsocket(ctx *gin.Context, notif *dbstruct.Notification, objType int64, objMids []int64) error {
|
||||||
for _, mid := range objMids {
|
for _, mid := range objMids {
|
||||||
|
|
||||||
// 先更新未读总数
|
// 若是广播消息,则在客户端进行一次消息的接收
|
||||||
err := DefaultService.utilIncrUrc(mid, notif.GetNType(), urc)
|
if objType != consts.Notification_ObjType_Customized {
|
||||||
if err != nil {
|
err := s.ReceiveAllBcstedNotifs(ctx, mid, objType)
|
||||||
logger.Error("utilIncrUrc fail, err: %v", err)
|
if err != nil {
|
||||||
return err
|
logger.Error("ReceiveAllBcstedNotifs failed, mid: %v, objType: %v, err: %v", mid, objType, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
total, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcTotalIdForRedis(mid))
|
total, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcTotalIdForRedis(mid))
|
||||||
|
|
||||||
param := &firenzeproto.SendBizMsgParam{
|
param := &firenzeproto.SendBizMsgParam{
|
||||||
Mid: mid,
|
Mid: mid,
|
||||||
}
|
}
|
||||||
|
|
|
@ -2201,7 +2201,7 @@ func (s *Service) OpApproveStreamerAuthApprovalBasic(ctx *gin.Context, req *stre
|
||||||
}
|
}
|
||||||
|
|
||||||
// 发送通知
|
// 发送通知
|
||||||
DefaultNotifBuilderHandler.Handle(ctx)(consts.AudNotifTemp_StreamerBasicInfoPassed)(midList)
|
DefaultNotifBuilderHandler.Handle(ctx)(consts.AudNotifTemp_StreamerBasicInfoPassed, consts.CtrlNotifTemp_SyncNotifBcstVersForStreamer)(midList)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
|
@ -109,6 +109,19 @@ type NotifReceive struct {
|
||||||
DelFlag int64 `json:"del_flag" bson:"del_flag"` // 删除标记
|
DelFlag int64 `json:"del_flag" bson:"del_flag"` // 删除标记
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *NotifReceive) Copy() *NotifReceive {
|
||||||
|
return &NotifReceive{
|
||||||
|
Id: p.Id,
|
||||||
|
ObjMid: p.ObjMid,
|
||||||
|
Nid: p.Nid,
|
||||||
|
NType: p.NType,
|
||||||
|
IsRead: p.IsRead,
|
||||||
|
Ct: p.Ct,
|
||||||
|
Ut: p.Ut,
|
||||||
|
DelFlag: p.DelFlag,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type NotifReceivePull struct {
|
type NotifReceivePull struct {
|
||||||
Id int64 `json:"id" bson:"_id"` // 通知接收拉取表id
|
Id int64 `json:"id" bson:"_id"` // 通知接收拉取表id
|
||||||
ObjMid int64 `json:"obj_mid" bson:"obj_mid"` // 通知接收人mid
|
ObjMid int64 `json:"obj_mid" bson:"obj_mid"` // 通知接收人mid
|
||||||
|
|
Loading…
Reference in New Issue