diff --git a/api/consts/status.go b/api/consts/status.go index 34a0c569..71a1879e 100644 --- a/api/consts/status.go +++ b/api/consts/status.go @@ -330,6 +330,13 @@ const ( // 系统通知表的推送状态 const ( - NotificationStatus_Created = 0 //已创建 - NotificationStatus_Pushed = 1 //已推送 + Notification_Created = 0 //待推送 + Notification_Pushed = 1 //已推送 + Notification_FailedToPush = 2 //推送失败 + Notification_Cancelled = 3 //已撤销 +) + +// 系统通知表拉取状态 +const ( + NotifReceivePull_Fail = -1 // 失败 ) diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index 032c305b..5ecacb1c 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -246,6 +246,7 @@ const ( COLNotifBcstVers = "notif_bcst_vers" COLNotifBcstReceiveVers = "notif_bcst_receive_vers" COLNotifReceiveIdSeq = "notif_receive_id_seq" + COLNotifReceivePull = "notif_receive_pull" DBRavenIQTest = "Raven_IQ_test" COLRavenIQTest = "Raven_IQ_test" @@ -655,6 +656,11 @@ func (m *Mongo) getColNotifBcstReceiveVers() *qmgo.Collection { return m.clientMix.Database(DBNotification).Collection(COLNotifBcstReceiveVers) } +// 系统通知接收拉取表 +func (m *Mongo) getColNotifReceivePull() *qmgo.Collection { + return m.clientMix.Database(DBNotification).Collection(COLNotifReceivePull) +} + // 瑞文智商测试表 func (m *Mongo) getColRavenIQTest() *qmgo.Collection { return m.clientMix.Database(DBRavenIQTest).Collection(COLRavenIQTest) @@ -6383,15 +6389,16 @@ func (m *Mongo) DeleteNotifBcstByIds(ctx *gin.Context, ids []int64) error { return err } -func (m *Mongo) GetNotifBcstListByVersRange(ctx *gin.Context, vers_lb, vers_ub, offset, limit int64) ([]*dbstruct.NotifBcst, error) { +func (m *Mongo) GetNotifBcstListByVersRange(ctx *gin.Context, objType, versLb, versUb, offset, limit int64) ([]*dbstruct.NotifBcst, error) { list := make([]*dbstruct.NotifBcst, 0) col := m.getColNotifBcst() query := qmgo.M{ "vers": qmgo.M{ - "$gte": vers_lb, - "$lte": vers_ub, + "$gte": versLb, + "$lte": versUb, }, + "obj_type": objType, "del_flag": 0, } err := col.Find(ctx, query).Sort("-ct").Skip(offset).Limit(limit).All(&list) @@ -6441,6 +6448,13 @@ func (m *Mongo) GetNotifReceiveListByObjMid(ctx *gin.Context, objMid, offset, li return list, err } +// 通知接收表 +func (m *Mongo) CreateNotifReceivePull(ctx *gin.Context, notifReceivePull *dbstruct.NotifReceivePull) error { + col := m.getColNotifReceivePull() + _, err := col.InsertOne(ctx, notifReceivePull) + return err +} + // 瑞文智商测试表相关 func (m *Mongo) CreateRavenIQTest(ctx *gin.Context, Raven_IQ_test *dbstruct.RavenIQTest) error { col := m.getColRavenIQTest() diff --git a/app/mix/service/logic/notif_bcst.go b/app/mix/service/logic/notif_bcst.go index 94b9ebe4..7cf20abb 100644 --- a/app/mix/service/logic/notif_bcst.go +++ b/app/mix/service/logic/notif_bcst.go @@ -45,8 +45,8 @@ func (p *NotifBcst) OpDeleteByIds(ctx *gin.Context, ids []int64) error { return nil } -func (p *NotifBcst) OpListByVersRange(ctx *gin.Context, vers_lb, vers_ub, offset, limit int64) ([]*dbstruct.NotifBcst, error) { - list, err := p.store.GetNotifBcstListByVersRange(ctx, vers_lb, vers_ub, offset, limit) +func (p *NotifBcst) OpListByVersRange(ctx *gin.Context, objType, versLb, versUb, offset, limit int64) ([]*dbstruct.NotifBcst, error) { + list, err := p.store.GetNotifBcstListByVersRange(ctx, objType, versLb, versUb, offset, limit) if err != nil { logger.Error("GetNotificationListByMid fail, err: %v", err) return make([]*dbstruct.NotifBcst, 0), err diff --git a/app/mix/service/logic/notif_receive_pull.go b/app/mix/service/logic/notif_receive_pull.go new file mode 100644 index 00000000..b9d75ef2 --- /dev/null +++ b/app/mix/service/logic/notif_receive_pull.go @@ -0,0 +1,37 @@ +package logic + +import ( + "service/api/consts" + "service/app/mix/dao" + "service/dbstruct" + "service/library/idgenerator" + "service/library/logger" + "time" + + "github.com/gin-gonic/gin" +) + +type NotifReceivePull struct { + store *dao.Store +} + +func NewNotifReceivePull(store *dao.Store) (a *NotifReceivePull) { + a = &NotifReceivePull{ + store: store, + } + return +} + +func (p *NotifReceivePull) OpCreate(ctx *gin.Context, notifReceivePull *dbstruct.NotifReceivePull) error { + + notifReceivePull.Id = idgenerator.GenNotifReceivePullId() + notifReceivePull.Ct = time.Now().Unix() + notifReceivePull.Ut = time.Now().Unix() + notifReceivePull.DelFlag = consts.Exist + err := p.store.CreateNotifReceivePull(ctx, notifReceivePull) + if err != nil { + logger.Error("CreateNotifReceivePull fail, err: %v", err) + return err + } + return nil +} diff --git a/app/mix/service/notification_center.go b/app/mix/service/notification_center.go index 5c7899b3..088faeaa 100644 --- a/app/mix/service/notification_center.go +++ b/app/mix/service/notification_center.go @@ -2,6 +2,7 @@ package service import ( "service/api/consts" + "service/dbstruct" "service/library/logger" "github.com/gin-gonic/gin" @@ -16,31 +17,56 @@ func NewNotifBcstCenter() *NotifBcstCenter { return new(NotifBcstCenter) } -// 写入一条主播全局广播消息 -func (s *NotifBcstCenter) BcstANotifToAllStreamers() error { +// 写入一条全局广播消息 +func (s *NotifBcstCenter) BcstANotif(ctx *gin.Context, objType int64, nids []int64) error { + // 获取最新的主播/用户方广播版本号 + notifBcstVers, err := _DefaultNotifBcstVers.GetAndUpdateNotifBcstVers(ctx, objType) + if err != nil { + logger.Error("GetAndUpdateNotifBcstVers fail, err: %v", err) + return err + } + err = _DefaultNotifBcst.OpCreate(ctx, &dbstruct.NotifBcst{ + Nids: nids, + Vers: notifBcstVers.Vers, + ObjType: objType, + }) + if err != nil { + logger.Error("_DefaultNotifBcst OpCreate fail, err: %v", err) + return err + } + return nil } -// 写入一条用户全局广播消息 -func (s *NotifBcstCenter) BcstANotifToAllUsers() error { - -} - -// 写入一条主播和用户全局广播消息 -func (s *NotifBcstCenter) BcstANotifToAllStreamersAndUsers() error { - +// 直接推送至用户 +func (s *NotifBcstCenter) PushNotifsToMids(ctx *gin.Context, nids []int64, mids []int64) error { + notifReceives := make([]*dbstruct.NotifReceive, 0) + for _, mid := range mids { + for _, nid := range nids { + notifReceives = append(notifReceives, &dbstruct.NotifReceive{ + ObjMid: mid, + Nid: nid, + }) + } + } + err := _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives) + if err != nil { + logger.Error("OpCreateBatch fail, err: %v", err) + return err + } + return nil } // 主播获取全局广播消息 -func (s *NotifBcstCenter) ReceiveAllBcstedNotifsAsStreamer(ctx *gin.Context, mid int64) error { - // 获取当前主播全局广播的版本号 - notifBcstVers, err := _DefaultNotifBcstVers.GetNotifBcstVers(ctx, consts.Notification_ObjType_AllStreamer) +func (s *NotifBcstCenter) ReceiveAllBcstedNotifs(ctx *gin.Context, mid, objType int64) error { + // 获取当前主播/用户全局广播的版本号(注意这个版本号-1才是当前数据库有的最新版本号) + notifBcstVers, err := _DefaultNotifBcstVers.GetNotifBcstVers(ctx, objType) if err != nil { logger.Error("GetNotifBcstVers fail, err: %v", err) return err } - // 获取该主播已接收的全局广播版本号 + // 获取该主播/用户已接收的全局广播版本号,并直接更新至最新的全局广播版本号 notifBcstReceiveVers, err := _DefaultNotifBcstVers.GetAndUpdateNotifBcstReceiveVers(ctx, mid, notifBcstVers.Vers) if err != nil { logger.Error("GetAndUpdateNotifBcstReceiveVers fail, err: %v", err) @@ -52,11 +78,52 @@ func (s *NotifBcstCenter) ReceiveAllBcstedNotifsAsStreamer(ctx *gin.Context, mid return nil } - // 如果版本号不一致,则接收新广播的消息 - _DefaultNotifBcst.OpListByVersRange(ctx, notifBcstReceiveVers.Vers, notifBcstVers.Vers-1, 0, 1000) + // 如果版本号不一致,则拉取新广播的消息(暂定最多接收1000条) + err = s.pullAllBcstedNotifs(ctx, notifBcstVers.Vers, notifBcstReceiveVers.Vers, mid, objType) + if err != nil { + logger.Error("pullAllBcstedNotifs fail, err: %v", err) + // 将失败的拉取结果存入表内 + err := _DefaultNotifReceivePull.OpCreate(ctx, &dbstruct.NotifReceivePull{ + ObjMid: mid, + VersBefore: notifBcstVers.Vers, + VersAfter: notifBcstReceiveVers.Vers, + Status: consts.NotifReceivePull_Fail, + Desc: err.Error(), + }) + if err != nil { + logger.Error("_DefaultNotifReceivePull OpCreate fail, err: %v", err) + } + return err + } + + return nil } -// 用户获取全局广播消息 -func (s *NotifBcstCenter) ReceiveAllBcstedNotifsAsUser(mid int64) error { - +// 拉取新消息 +func (s *NotifBcstCenter) pullAllBcstedNotifs(ctx *gin.Context, vers, receiveVers, objMid, objType int64) error { + vub := vers - 1 + vlb := vub - 999 + if vlb < receiveVers { + vlb = receiveVers + } + notifBcsts, err := _DefaultNotifBcst.OpListByVersRange(ctx, objType, vlb, vub, 0, 1000) + if err != nil { + logger.Error("OpListByVersRange fail, err: %v", err) + return err + } + notifReceives := make([]*dbstruct.NotifReceive, 0) + for _, notifBcst := range notifBcsts { + for _, nid := range notifBcst.Nids { + notifReceives = append(notifReceives, &dbstruct.NotifReceive{ + ObjMid: objMid, + Nid: nid, + }) + } + } + err = _DefaultNotifReceive.OpCreateBatch(ctx, notifReceives) + if err != nil { + logger.Error("OpCreateBatch fail, err: %v", err) + return err + } + return nil } diff --git a/app/mix/service/service.go b/app/mix/service/service.go index f1b85365..87002ffc 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -156,6 +156,7 @@ var ( _DefaultNotifBcstVers *logic.NotifBcstVers _DefaultNotifBcst *logic.NotifBcst _DefaultNotifReceive *logic.NotifReceive + _DefaultNotifReceivePull *logic.NotifReceivePull ) type Service struct { @@ -5016,6 +5017,8 @@ func (s *Service) OpGetSingleDistributeHisList(ctx *gin.Context, req *single_dis func (s *Service) OpCreateNotification(ctx *gin.Context, req *notificationproto.OpCreateReq) (ec errcode.ErrCode) { ec = errcode.ErrCodeNotificationSrvOk + // 首先将该通知的信息插入数据库,获得nid + req.Notification.Status = goproto.Int64(consts.Notification_Created) err := _DefaultNotification.OpCreate(ctx, req) if err != nil { logger.Error("OpCreate fail, req: %v, err: %v", util.ToJson(req), err) @@ -5023,6 +5026,8 @@ func (s *Service) OpCreateNotification(ctx *gin.Context, req *notificationproto. return } + // 根据推送时间判断 + return } diff --git a/dbstruct/notification.go b/dbstruct/notification.go index 3f333248..911e5ac0 100644 --- a/dbstruct/notification.go +++ b/dbstruct/notification.go @@ -43,3 +43,15 @@ type NotifReceive struct { Ut int64 `json:"ut" bson:"ut"` // 更新时间 DelFlag int64 `json:"del_flag" bson:"del_flag"` // 删除标记 } + +type NotifReceivePull struct { + Id int64 `json:"id" bson:"_id"` // 通知接收拉取表id + ObjMid int64 `json:"obj_mid" bson:"obj_mid"` // 通知接收人mid + VersBefore int64 `json:"vers_before" bson:"vers_before"` // 此前版本号 + VersAfter int64 `json:"vers_after" bson:"vers_after"` // 新版本号 + Status int64 `json:"status" bson:"status"` // 状态 + Desc string `json:"desc" bson:"desc"` // 描述 + Ct int64 `json:"ct" bson:"ct"` // 创建时间 + Ut int64 `json:"ut" bson:"ut"` // 更新时间 + DelFlag int64 `json:"del_flag" bson:"del_flag"` // 删除标记 +} diff --git a/library/idgenerator/genid.go b/library/idgenerator/genid.go index b808deb8..e70874ed 100644 --- a/library/idgenerator/genid.go +++ b/library/idgenerator/genid.go @@ -58,6 +58,7 @@ const ( NodeSingleDistributeHis // node 慧用工下发打款历史表 NodeNotification // node 系统通知表 NodeNotifBcst // node 系统通知广播表 + NodeNotifReceivePull // node 系统通知接收拉取表 ) func GenIdInt64(node int64) (int64, error) { @@ -287,3 +288,9 @@ func GenNotifBcstId() int64 { id, _ := GenIdInt64(NodeNotifBcst) return id } + +// notif_receive_pull +func GenNotifReceivePullId() int64 { + id, _ := GenIdInt64(NodeNotifReceivePull) + return id +}