diff --git a/app/mix/service/notif_bcst_center.go b/app/mix/service/notif_bcst_center.go index 55ee2f6c..e6019b14 100644 --- a/app/mix/service/notif_bcst_center.go +++ b/app/mix/service/notif_bcst_center.go @@ -104,49 +104,39 @@ func (s *NotifBcstCenter) bcstNotifsToAll(ctx *gin.Context, nids []int64, objTyp // 直接推送至用户 func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMids []int64) error { - // 查询得到消息 - notifMap := make(map[int64]*dbstruct.Notification) - notifs, err := _DefaultNotification.GetListByIds(ctx, nids) + // 查询得到信息 + info, err := s.extractNotifInfo(ctx, nids) if err != nil { - logger.Error("GetNotificationListByIds fail, err: %v", err) + logger.Error("extractNotifInfo fail, err: %v", err) return err } - for _, notif := range notifs { - notifMap[notif.GetId()] = notif - } notifReceives := make([]*dbstruct.NotifReceive, 0) - midRecentReceiveMap := make(map[int64]map[int64]*dbstruct.NotifRecentReceive) // 最近的一条通知nid - for _, mid := range objMids { - recentReceiveMap := make(map[int64]*dbstruct.NotifRecentReceive) - for _, nid := range nids { - notifReceive := &dbstruct.NotifReceive{ - ObjMid: mid, - Nid: nid, - NType: notifMap[nid].GetNType(), - IsRead: consts.NotifReceive_NotRead, - Ct: notifMap[nid].GetRt(), - } - notifReceives = append(notifReceives, notifReceive) - s.tryToReplaceRecentReceive(recentReceiveMap, notifReceive) - } - midRecentReceiveMap[mid] = recentReceiveMap - } - logger.Info("trying to update recent_receive...") - logger.Info("mid_recent_receive_map: %v", midRecentReceiveMap) - // 尝试更新最近接收消息记录 - for mid, recentReceiveMap := range midRecentReceiveMap { - logger.Info("recent_receive_map: %v", recentReceiveMap) - for nType, recentReceive := range recentReceiveMap { + for _, mid := range objMids { + // 复制信息 + for _, v := range info.NotifReceives { + notifReceive := v.Copy() + notifReceive.ObjMid = mid + notifReceives = append(notifReceives, notifReceive) + } + // 尝试更新最近接收消息记录 + for nType, recentReceive := range info.RecentReceiveMap { id := util.GetNotifRecentReceiveId(mid, nType) - logger.Info("trying to update recent_receive for %v to %v", id, recentReceive) _, err := _DefaultNotifRecentReceive.GetAndUpdateNotifRecentReceive(ctx, id, recentReceive) if err != nil { logger.Error("GetAndUpdateNotifRecentReceive fail, err: %v", err) return err } } + // 新增未读数 + for nType, incr := range info.UrcMap { + err := DefaultService.utilIncrUrc(mid, nType, incr) + if err != nil { + logger.Error("utilIncrUrc fail, err: %v", err) + return err + } + } } err = _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives) @@ -158,6 +148,44 @@ func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMi return nil } +type ExtractedNotifReceiveInfo struct { + NotifReceives []*dbstruct.NotifReceive // 接收记录 + RecentReceiveMap map[int64]*dbstruct.NotifRecentReceive // 最近接收消息map,从notification生成 + UrcMap map[int64]int64 // 需要自增的map,从notification生成 +} + +// 提取消息数组的信息 +func (s *NotifBcstCenter) extractNotifInfo(ctx *gin.Context, nids []int64) (info *ExtractedNotifReceiveInfo, recenterr error) { + + info = &ExtractedNotifReceiveInfo{ + NotifReceives: make([]*dbstruct.NotifReceive, 0), + RecentReceiveMap: make(map[int64]*dbstruct.NotifRecentReceive), + UrcMap: make(map[int64]int64), + } + + // 查询得到消息 + notifs, err := _DefaultNotification.GetListByIds(ctx, nids) + if err != nil { + logger.Error("GetNotificationListByIds fail, err: %v", err) + return info, err + } + + for _, notif := range notifs { + // 组装接收记录 + notifReceive := &dbstruct.NotifReceive{ + Nid: notif.GetId(), + NType: notif.GetNType(), + IsRead: consts.NotifReceive_NotRead, + Ct: notif.GetRt(), + } + info.NotifReceives = append(info.NotifReceives, notifReceive) + s.tryToReplaceRecentReceive(info.RecentReceiveMap, notifReceive) + info.UrcMap[notif.GetNType()]++ + } + + return +} + // 主播获取全局广播消息 func (s *NotifBcstCenter) ReceiveAllBcstedNotifs(ctx *gin.Context, mid, objType int64) error { // 获取当前主播/用户全局广播的版本号(注意这个版本号-1才是当前数据库有的最新版本号) @@ -214,51 +242,45 @@ func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVer logger.Error("OpListByVersRange fail, err: %v", err) return err } - // 初始化接收表记录 - notifReceives := make([]*dbstruct.NotifReceive, 0) - notifReceiveMap := make(map[int64]*dbstruct.NotifReceive) + + // 获取nids nids := make([]int64, 0) - // 填充接收表记录 for _, notifBcst := range notifBcsts { - for _, nid := range notifBcst.Nids { - notifReceive := &dbstruct.NotifReceive{ - ObjMid: objMid, - Nid: nid, - IsRead: consts.NotifReceive_NotRead, - } - notifReceives = append(notifReceives, notifReceive) - notifReceiveMap[nid] = notifReceive - nids = append(nids, nid) - } + nids = append(nids, notifBcst.Nids...) } - // 初始化最近接收记录信息 - recentReceiveMap := make(map[int64]*dbstruct.NotifRecentReceive) // 最近的一条通知nid - // 查询通知详细信息 - notifs, err := _DefaultNotification.GetListByIds(ctx, nids) + + // 查询得到接收表信息 + info, err := s.extractNotifInfo(ctx, nids) if err != nil { - logger.Error("GetNotificationListByIds fail, err: %v", err) + logger.Error("extractNotifInfo fail, err: %v", err) return err } - // 补足接收表的接收时间和消息类型 - for _, notif := range notifs { - notifReceive, ok := notifReceiveMap[notif.GetId()] - if ok { - notifReceive.NType = notif.GetNType() - notifReceive.Ct = notif.GetRt() - s.tryToReplaceRecentReceive(recentReceiveMap, notifReceive) - } + + notifReceives := make([]*dbstruct.NotifReceive, 0) + + // 复制信息 + for _, v := range info.NotifReceives { + notifReceive := v.Copy() + notifReceive.ObjMid = objMid + notifReceives = append(notifReceives, notifReceive) } // 尝试更新最近接收消息记录 - logger.Info("recent_receive_map: %v", recentReceiveMap) - for nType, recentReceive := range recentReceiveMap { + for nType, recentReceive := range info.RecentReceiveMap { id := util.GetNotifRecentReceiveId(objMid, nType) - logger.Info("trying to update recent_receive for %v to %v", id, recentReceive) _, err := _DefaultNotifRecentReceive.GetAndUpdateNotifRecentReceive(ctx, id, recentReceive) if err != nil { logger.Error("GetAndUpdateNotifRecentReceive fail, err: %v", err) return err } } + // 新增未读数 + for nType, incr := range info.UrcMap { + err := DefaultService.utilIncrUrc(objMid, nType, incr) + if err != nil { + logger.Error("utilIncrUrc fail, err: %v", err) + return err + } + } err = _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives) if err != nil { @@ -277,49 +299,55 @@ func (s *NotifBcstCenter) bcstThroughWebsocket(ctx *gin.Context, nids []int64, o logger.Error("_DefaultNotification GetListById fail, err: %v", err) return err } - urc := int64(len(nids)) // 根据通知类型进行广播 switch objType { case consts.Notification_ObjType_Customized: - return s.bcstToMidsThroughWebsocket(ctx, notif, urc, objMids) + return s.bcstToMidsThroughWebsocket(ctx, notif, objType, objMids) case consts.Notification_ObjType_AllStreamer: smids, _, err := s.groupOnlineMids(ctx) if err != nil { logger.Error("groupOnlineMids fail, resp: %v, err: %v", err) return err } - return s.bcstToMidsThroughWebsocket(ctx, notif, urc, smids) + return s.bcstToMidsThroughWebsocket(ctx, notif, objType, smids) case consts.Notification_ObjType_AllUser: _, umids, err := s.groupOnlineMids(ctx) if err != nil { logger.Error("groupOnlineMids fail, resp: %v, err: %v", err) return err } - return s.bcstToMidsThroughWebsocket(ctx, notif, urc, umids) + return s.bcstToMidsThroughWebsocket(ctx, notif, objType, umids) case consts.Notification_ObjType_AllStreamerAndUser: - mids, err := s.getOnlineMids() + smids, umids, err := s.groupOnlineMids(ctx) if err != nil { - logger.Error("getOnlineMids fail, resp: %v, err: %v", err) + logger.Error("groupOnlineMids fail, resp: %v, err: %v", err) return err } - return s.bcstToMidsThroughWebsocket(ctx, notif, urc, mids) + err = s.bcstToMidsThroughWebsocket(ctx, notif, consts.Notification_ObjType_AllStreamer, smids) + if err != nil { + logger.Error("bcstToMidsThroughWebsocket fail, resp: %v, err: %v", err) + return err + } + return s.bcstToMidsThroughWebsocket(ctx, notif, consts.Notification_ObjType_AllUser, umids) default: return nil } } -func (s *NotifBcstCenter) bcstToMidsThroughWebsocket(ctx *gin.Context, notif *dbstruct.Notification, urc int64, objMids []int64) error { +func (s *NotifBcstCenter) bcstToMidsThroughWebsocket(ctx *gin.Context, notif *dbstruct.Notification, objType int64, objMids []int64) error { for _, mid := range objMids { - // 先更新未读总数 - err := DefaultService.utilIncrUrc(mid, notif.GetNType(), urc) - if err != nil { - logger.Error("utilIncrUrc fail, err: %v", err) - return err + // 若是广播消息,则在客户端进行一次消息的接收 + if objType != consts.Notification_ObjType_Customized { + err := s.ReceiveAllBcstedNotifs(ctx, mid, objType) + if err != nil { + logger.Error("ReceiveAllBcstedNotifs failed, mid: %v, objType: %v, err: %v", mid, objType, err) + } } total, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcTotalIdForRedis(mid)) + param := &firenzeproto.SendBizMsgParam{ Mid: mid, } diff --git a/app/mix/service/service.go b/app/mix/service/service.go index c52071a4..72481b07 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -2201,7 +2201,7 @@ func (s *Service) OpApproveStreamerAuthApprovalBasic(ctx *gin.Context, req *stre } // 发送通知 - DefaultNotifBuilderHandler.Handle(ctx)(consts.AudNotifTemp_StreamerBasicInfoPassed)(midList) + DefaultNotifBuilderHandler.Handle(ctx)(consts.AudNotifTemp_StreamerBasicInfoPassed, consts.CtrlNotifTemp_SyncNotifBcstVersForStreamer)(midList) } return diff --git a/dbstruct/notification.go b/dbstruct/notification.go index ea915d1a..445751b5 100644 --- a/dbstruct/notification.go +++ b/dbstruct/notification.go @@ -109,6 +109,19 @@ type NotifReceive struct { DelFlag int64 `json:"del_flag" bson:"del_flag"` // 删除标记 } +func (p *NotifReceive) Copy() *NotifReceive { + return &NotifReceive{ + Id: p.Id, + ObjMid: p.ObjMid, + Nid: p.Nid, + NType: p.NType, + IsRead: p.IsRead, + Ct: p.Ct, + Ut: p.Ut, + DelFlag: p.DelFlag, + } +} + type NotifReceivePull struct { Id int64 `json:"id" bson:"_id"` // 通知接收拉取表id ObjMid int64 `json:"obj_mid" bson:"obj_mid"` // 通知接收人mid