by Robin at 20241018

This commit is contained in:
Leufolium 2024-10-18 16:41:06 +08:00
parent 9ebe86919d
commit b5979089b9
8 changed files with 175 additions and 26 deletions

View File

@ -330,6 +330,13 @@ const (
// 系统通知表的推送状态
const (
NotificationStatus_Created = 0 //已创建
NotificationStatus_Pushed = 1 //已推送
Notification_Created = 0 //待推送
Notification_Pushed = 1 //已推送
Notification_FailedToPush = 2 //推送失败
Notification_Cancelled = 3 //已撤销
)
// 系统通知表拉取状态
const (
NotifReceivePull_Fail = -1 // 失败
)

View File

@ -246,6 +246,7 @@ const (
COLNotifBcstVers = "notif_bcst_vers"
COLNotifBcstReceiveVers = "notif_bcst_receive_vers"
COLNotifReceiveIdSeq = "notif_receive_id_seq"
COLNotifReceivePull = "notif_receive_pull"
DBRavenIQTest = "Raven_IQ_test"
COLRavenIQTest = "Raven_IQ_test"
@ -655,6 +656,11 @@ func (m *Mongo) getColNotifBcstReceiveVers() *qmgo.Collection {
return m.clientMix.Database(DBNotification).Collection(COLNotifBcstReceiveVers)
}
// 系统通知接收拉取表
func (m *Mongo) getColNotifReceivePull() *qmgo.Collection {
return m.clientMix.Database(DBNotification).Collection(COLNotifReceivePull)
}
// 瑞文智商测试表
func (m *Mongo) getColRavenIQTest() *qmgo.Collection {
return m.clientMix.Database(DBRavenIQTest).Collection(COLRavenIQTest)
@ -6383,15 +6389,16 @@ func (m *Mongo) DeleteNotifBcstByIds(ctx *gin.Context, ids []int64) error {
return err
}
func (m *Mongo) GetNotifBcstListByVersRange(ctx *gin.Context, vers_lb, vers_ub, offset, limit int64) ([]*dbstruct.NotifBcst, error) {
func (m *Mongo) GetNotifBcstListByVersRange(ctx *gin.Context, objType, versLb, versUb, offset, limit int64) ([]*dbstruct.NotifBcst, error) {
list := make([]*dbstruct.NotifBcst, 0)
col := m.getColNotifBcst()
query := qmgo.M{
"vers": qmgo.M{
"$gte": vers_lb,
"$lte": vers_ub,
"$gte": versLb,
"$lte": versUb,
},
"obj_type": objType,
"del_flag": 0,
}
err := col.Find(ctx, query).Sort("-ct").Skip(offset).Limit(limit).All(&list)
@ -6441,6 +6448,13 @@ func (m *Mongo) GetNotifReceiveListByObjMid(ctx *gin.Context, objMid, offset, li
return list, err
}
// 通知接收表
func (m *Mongo) CreateNotifReceivePull(ctx *gin.Context, notifReceivePull *dbstruct.NotifReceivePull) error {
col := m.getColNotifReceivePull()
_, err := col.InsertOne(ctx, notifReceivePull)
return err
}
// 瑞文智商测试表相关
func (m *Mongo) CreateRavenIQTest(ctx *gin.Context, Raven_IQ_test *dbstruct.RavenIQTest) error {
col := m.getColRavenIQTest()

View File

@ -45,8 +45,8 @@ func (p *NotifBcst) OpDeleteByIds(ctx *gin.Context, ids []int64) error {
return nil
}
func (p *NotifBcst) OpListByVersRange(ctx *gin.Context, vers_lb, vers_ub, offset, limit int64) ([]*dbstruct.NotifBcst, error) {
list, err := p.store.GetNotifBcstListByVersRange(ctx, vers_lb, vers_ub, offset, limit)
func (p *NotifBcst) OpListByVersRange(ctx *gin.Context, objType, versLb, versUb, offset, limit int64) ([]*dbstruct.NotifBcst, error) {
list, err := p.store.GetNotifBcstListByVersRange(ctx, objType, versLb, versUb, offset, limit)
if err != nil {
logger.Error("GetNotificationListByMid fail, err: %v", err)
return make([]*dbstruct.NotifBcst, 0), err

View File

@ -0,0 +1,37 @@
package logic
import (
"service/api/consts"
"service/app/mix/dao"
"service/dbstruct"
"service/library/idgenerator"
"service/library/logger"
"time"
"github.com/gin-gonic/gin"
)
type NotifReceivePull struct {
store *dao.Store
}
func NewNotifReceivePull(store *dao.Store) (a *NotifReceivePull) {
a = &NotifReceivePull{
store: store,
}
return
}
func (p *NotifReceivePull) OpCreate(ctx *gin.Context, notifReceivePull *dbstruct.NotifReceivePull) error {
notifReceivePull.Id = idgenerator.GenNotifReceivePullId()
notifReceivePull.Ct = time.Now().Unix()
notifReceivePull.Ut = time.Now().Unix()
notifReceivePull.DelFlag = consts.Exist
err := p.store.CreateNotifReceivePull(ctx, notifReceivePull)
if err != nil {
logger.Error("CreateNotifReceivePull fail, err: %v", err)
return err
}
return nil
}

View File

@ -2,6 +2,7 @@ package service
import (
"service/api/consts"
"service/dbstruct"
"service/library/logger"
"github.com/gin-gonic/gin"
@ -16,31 +17,56 @@ func NewNotifBcstCenter() *NotifBcstCenter {
return new(NotifBcstCenter)
}
// 写入一条主播全局广播消息
func (s *NotifBcstCenter) BcstANotifToAllStreamers() error {
// 写入一条全局广播消息
func (s *NotifBcstCenter) BcstANotif(ctx *gin.Context, objType int64, nids []int64) error {
// 获取最新的主播/用户方广播版本号
notifBcstVers, err := _DefaultNotifBcstVers.GetAndUpdateNotifBcstVers(ctx, objType)
if err != nil {
logger.Error("GetAndUpdateNotifBcstVers fail, err: %v", err)
return err
}
err = _DefaultNotifBcst.OpCreate(ctx, &dbstruct.NotifBcst{
Nids: nids,
Vers: notifBcstVers.Vers,
ObjType: objType,
})
if err != nil {
logger.Error("_DefaultNotifBcst OpCreate fail, err: %v", err)
return err
}
return nil
}
// 写入一条用户全局广播消息
func (s *NotifBcstCenter) BcstANotifToAllUsers() error {
}
// 写入一条主播和用户全局广播消息
func (s *NotifBcstCenter) BcstANotifToAllStreamersAndUsers() error {
// 直接推送至用户
func (s *NotifBcstCenter) PushNotifsToMids(ctx *gin.Context, nids []int64, mids []int64) error {
notifReceives := make([]*dbstruct.NotifReceive, 0)
for _, mid := range mids {
for _, nid := range nids {
notifReceives = append(notifReceives, &dbstruct.NotifReceive{
ObjMid: mid,
Nid: nid,
})
}
}
err := _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives)
if err != nil {
logger.Error("OpCreateBatch fail, err: %v", err)
return err
}
return nil
}
// 主播获取全局广播消息
func (s *NotifBcstCenter) ReceiveAllBcstedNotifsAsStreamer(ctx *gin.Context, mid int64) error {
// 获取当前主播全局广播的版本号
notifBcstVers, err := _DefaultNotifBcstVers.GetNotifBcstVers(ctx, consts.Notification_ObjType_AllStreamer)
func (s *NotifBcstCenter) ReceiveAllBcstedNotifs(ctx *gin.Context, mid, objType int64) error {
// 获取当前主播/用户全局广播的版本号(注意这个版本号-1才是当前数据库有的最新版本号
notifBcstVers, err := _DefaultNotifBcstVers.GetNotifBcstVers(ctx, objType)
if err != nil {
logger.Error("GetNotifBcstVers fail, err: %v", err)
return err
}
// 获取该主播已接收的全局广播版本号
// 获取该主播/用户已接收的全局广播版本号,并直接更新至最新的全局广播版本号
notifBcstReceiveVers, err := _DefaultNotifBcstVers.GetAndUpdateNotifBcstReceiveVers(ctx, mid, notifBcstVers.Vers)
if err != nil {
logger.Error("GetAndUpdateNotifBcstReceiveVers fail, err: %v", err)
@ -52,11 +78,52 @@ func (s *NotifBcstCenter) ReceiveAllBcstedNotifsAsStreamer(ctx *gin.Context, mid
return nil
}
// 如果版本号不一致,则接收新广播的消息
_DefaultNotifBcst.OpListByVersRange(ctx, notifBcstReceiveVers.Vers, notifBcstVers.Vers-1, 0, 1000)
// 如果版本号不一致,则拉取新广播的消息(暂定最多接收1000条)
err = s.pullAllBcstedNotifs(ctx, notifBcstVers.Vers, notifBcstReceiveVers.Vers, mid, objType)
if err != nil {
logger.Error("pullAllBcstedNotifs fail, err: %v", err)
// 将失败的拉取结果存入表内
err := _DefaultNotifReceivePull.OpCreate(ctx, &dbstruct.NotifReceivePull{
ObjMid: mid,
VersBefore: notifBcstVers.Vers,
VersAfter: notifBcstReceiveVers.Vers,
Status: consts.NotifReceivePull_Fail,
Desc: err.Error(),
})
if err != nil {
logger.Error("_DefaultNotifReceivePull OpCreate fail, err: %v", err)
}
return err
}
return nil
}
// 用户获取全局广播消息
func (s *NotifBcstCenter) ReceiveAllBcstedNotifsAsUser(mid int64) error {
// 拉取新消息
func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVers, objMid, objType int64) error {
vub := vers - 1
vlb := vub - 999
if vlb < receiveVers {
vlb = receiveVers
}
notifBcsts, err := _DefaultNotifBcst.OpListByVersRange(ctx, objType, vlb, vub, 0, 1000)
if err != nil {
logger.Error("OpListByVersRange fail, err: %v", err)
return err
}
notifReceives := make([]*dbstruct.NotifReceive, 0)
for _, notifBcst := range notifBcsts {
for _, nid := range notifBcst.Nids {
notifReceives = append(notifReceives, &dbstruct.NotifReceive{
ObjMid: objMid,
Nid: nid,
})
}
}
err = _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives)
if err != nil {
logger.Error("OpCreateBatch fail, err: %v", err)
return err
}
return nil
}

View File

@ -156,6 +156,7 @@ var (
_DefaultNotifBcstVers *logic.NotifBcstVers
_DefaultNotifBcst *logic.NotifBcst
_DefaultNotifReceive *logic.NotifReceive
_DefaultNotifReceivePull *logic.NotifReceivePull
)
type Service struct {
@ -5016,6 +5017,8 @@ func (s *Service) OpGetSingleDistributeHisList(ctx *gin.Context, req *single_dis
func (s *Service) OpCreateNotification(ctx *gin.Context, req *notificationproto.OpCreateReq) (ec errcode.ErrCode) {
ec = errcode.ErrCodeNotificationSrvOk
// 首先将该通知的信息插入数据库获得nid
req.Notification.Status = goproto.Int64(consts.Notification_Created)
err := _DefaultNotification.OpCreate(ctx, req)
if err != nil {
logger.Error("OpCreate fail, req: %v, err: %v", util.ToJson(req), err)
@ -5023,6 +5026,8 @@ func (s *Service) OpCreateNotification(ctx *gin.Context, req *notificationproto.
return
}
// 根据推送时间判断
return
}

View File

@ -43,3 +43,15 @@ type NotifReceive struct {
Ut int64 `json:"ut" bson:"ut"` // 更新时间
DelFlag int64 `json:"del_flag" bson:"del_flag"` // 删除标记
}
type NotifReceivePull struct {
Id int64 `json:"id" bson:"_id"` // 通知接收拉取表id
ObjMid int64 `json:"obj_mid" bson:"obj_mid"` // 通知接收人mid
VersBefore int64 `json:"vers_before" bson:"vers_before"` // 此前版本号
VersAfter int64 `json:"vers_after" bson:"vers_after"` // 新版本号
Status int64 `json:"status" bson:"status"` // 状态
Desc string `json:"desc" bson:"desc"` // 描述
Ct int64 `json:"ct" bson:"ct"` // 创建时间
Ut int64 `json:"ut" bson:"ut"` // 更新时间
DelFlag int64 `json:"del_flag" bson:"del_flag"` // 删除标记
}

View File

@ -58,6 +58,7 @@ const (
NodeSingleDistributeHis // node 慧用工下发打款历史表
NodeNotification // node 系统通知表
NodeNotifBcst // node 系统通知广播表
NodeNotifReceivePull // node 系统通知接收拉取表
)
func GenIdInt64(node int64) (int64, error) {
@ -287,3 +288,9 @@ func GenNotifBcstId() int64 {
id, _ := GenIdInt64(NodeNotifBcst)
return id
}
// notif_receive_pull
func GenNotifReceivePullId() int64 {
id, _ := GenIdInt64(NodeNotifReceivePull)
return id
}