by Robin at 20241202

This commit is contained in:
Robin 2024-12-02 15:40:17 +08:00
parent b408f1ff16
commit fc58ef20a2
3 changed files with 74 additions and 17 deletions

View File

@ -4693,6 +4693,11 @@ func (s *Service) ApiReadNotification(ctx *gin.Context, req *notificationproto.A
key := util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, req.NType)
totalKey := util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid)
_, err = redis.GetRedisClient().DecrBy(key, result.ModifiedCount)
if err != nil {
logger.Error("Redis DecrBy fail, err: %v", err)
ec = errcode.ErrCodeNotificationSrvFail
return
}
_, err = redis.GetRedisClient().DecrBy(totalKey, result.ModifiedCount)
if err != nil {
logger.Error("Redis DecrBy fail, err: %v", err)
@ -4718,6 +4723,11 @@ func (s *Service) ApiReadAllNotification(ctx *gin.Context, req *notificationprot
key := util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, req.NType)
totalKey := util.GetNotifUrcTotalIdForRedis(req.BaseRequest.Mid)
_, err = redis.GetRedisClient().DecrBy(key, result.ModifiedCount)
if err != nil {
logger.Error("Redis DecrBy fail, err: %v", err)
ec = errcode.ErrCodeNotificationSrvFail
return
}
_, err = redis.GetRedisClient().DecrBy(totalKey, result.ModifiedCount)
if err != nil {
logger.Error("Redis DecrBy fail, err: %v", err)
@ -4746,7 +4756,7 @@ func (s *Service) ApiGetNotificationUrcByMid(ctx *gin.Context, req *notification
stotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, nType))
mp[nType] = stotal
} else {
for k, _ := range cfg.Map {
for k := range cfg.Map {
stotal, _ := redis.GetRedisClient().GetInt64(util.GetNotifUrcIdForRedis(req.BaseRequest.Mid, k))
mp[k] = stotal
}

View File

@ -65,6 +65,12 @@ func (s *NotifBcstCenter) BcstNotifs(ctx *gin.Context, nids []int64, objType int
}
// 长链接广播
go func() {
err := s.bcstThroughWebsocket(ctx, nids, objType, objMids)
if err != nil {
logger.Error("bcstThroughWebsocket fail, err: %v", err)
}
}()
return nil
}
@ -127,6 +133,10 @@ func (s *NotifBcstCenter) pushNotifsToMids(ctx *gin.Context, nids []int64, objMi
for _, mid := range objMids {
for nType, total := range nTypeTotalMap {
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcIdForRedis(mid, nType), total)
if err != nil {
logger.Error("Redis IncrBy fail, err: %v", err)
return err
}
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(mid), total)
if err != nil {
logger.Error("Redis IncrBy fail, err: %v", err)
@ -231,6 +241,10 @@ func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVer
// 记录未读总数
for nType, total := range nTypeTotalMap {
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcIdForRedis(objMid, nType), total)
if err != nil {
logger.Error("Redis IncrBy fail, err: %v", err)
return err
}
_, err = redis.GetRedisClient().IncrBy(util.GetNotifUrcTotalIdForRedis(objMid), total)
if err != nil {
logger.Error("Redis IncrBy fail, err: %v", err)
@ -256,11 +270,26 @@ func (s *NotifBcstCenter) bcstThroughWebsocket(ctx *gin.Context, nids []int64, o
case consts.Notification_ObjType_Customized:
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, objMids)
case consts.Notification_ObjType_AllStreamer:
smids, _, err := s.groupOnlineMids(ctx)
if err != nil {
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
return err
}
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, smids)
case consts.Notification_ObjType_AllUser:
_, umids, err := s.groupOnlineMids(ctx)
if err != nil {
logger.Error("groupOnlineMids fail, resp: %v, err: %v", err)
return err
}
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, umids)
case consts.Notification_ObjType_AllStreamerAndUser:
mids, err := s.getOnlineMids()
if err != nil {
logger.Error("getOnlineMids fail, resp: %v, err: %v", err)
return err
}
return s.bcstToMidsThroughWebsocket(ctx, notif, urc, mids)
default:
return nil
}
@ -291,6 +320,21 @@ func (s *NotifBcstCenter) bcstToMidsThroughWebsocket(ctx *gin.Context, notif *db
return nil
}
func (s *NotifBcstCenter) getOnlineMids() (mids []int64, err error) {
// 获取到所有在线用户
param := &firenzeproto.OnlineMidsParam{}
resp, err := firenzeapi.OnlineMids(param)
if err != nil {
logger.Error("OnlineMids fail, resp: %v, err: %v", util.ToJson(resp), err)
return
}
if resp.ErrCode != errcode.ErrCodeOk {
logger.Error("OnlineMids fail, invalid resp, resp: %v, err: %v", util.ToJson(resp), err)
return
}
return resp.Data.Mids, nil
}
func (s *NotifBcstCenter) groupOnlineMids(ctx *gin.Context) (streamerMids []int64, userMids []int64, err error) {
streamerMids = make([]int64, 0)
@ -308,18 +352,14 @@ func (s *NotifBcstCenter) groupOnlineMids(ctx *gin.Context) (streamerMids []int6
return
}
smids := make([]int64, 0)
var smids []int64
smids, err = _DefaultAccount.GetStreamerAccountMids(ctx)
if err != nil {
logger.Error("GetStreamerAccountMids fail, resp: %v, err: %v", util.ToJson(resp), err)
return
}
// 切片
for i := 0; i < len(resp.Data.Mids); i += 5000 {
upper := util.Int64Min(int64(len(resp.Data.Mids)), int64(i+5000-1))
mids := resp.Data.Mids[i:upper]
}
// 切分用户和主播
streamerMids, userMids = util.SplitInt64Slice(resp.Data.Mids, smids)
return
}

View File

@ -490,8 +490,8 @@ func Int64SliceMin(s []int64) int64 {
return min
}
// 将升序排序后的int64数组切分成含s2元素的和不含s2元素的
func SplitSortedInt64Slice(bs []int64, rs []int64) ([]int64, []int64) {
// 将int64数组切分成含s2元素的和不含s2元素的
func SplitInt64Slice(bs []int64, rs []int64) ([]int64, []int64) {
if len(bs) == 0 || len(rs) == 0 {
return bs, make([]int64, 0)
@ -500,10 +500,17 @@ func SplitSortedInt64Slice(bs []int64, rs []int64) ([]int64, []int64) {
s1 := make([]int64, 0)
s2 := make([]int64, 0)
rindex := 0
for _, e := range bs {
if e < rs[rindex] {
continue
rsMp := make(map[int64]bool)
for _, v := range rs {
rsMp[v] = true
}
for _, bv := range bs {
if rsMp[bv] {
s1 = append(s1, bv)
} else {
s2 = append(s2, bv)
}
}
return s1, s2
}