package mongo import ( "context" "fmt" "time" "github.com/Leufolium/test/dbstruct" "github.com/qiniu/qmgo" ) type UserIdMap struct { Seq int64 `json:"seq" bson:"_id"` //用户业务id序列号 UserId int64 `json:"user_id" bson:"user_id"` //映射后用户业务id序列号 } type Mongo struct { clientMix *qmgo.Client } func NewMongo() (mongo *Mongo, err error) { mongo = new(Mongo) mongo.clientMix, err = NewMongoClient() if err != nil { fmt.Printf("NewMongoClient fail, cfg: %v, err: %v", nil, err) return } return } const ( DBUserIdSeq = "user_id_seq" COLUserIdSeq = "user_id_seq" COLUserIdMap = "user_id_map" DBAccountIdSeq = "account_id_seq" COLAccountIdSeq = "account_id_seq" DBAccount = "account" COLAccount = "account" DBLogin = "login" COLLogin = "login" DBZone = "zone" COLZone = "zone" DBZoneMoment = "zone_moment" COLZoneMoment = "zone_moment" DBStreamer = "streamer" COLStreamer = "streamer" DBVas = "vas" COLProduct = "product" COLUserVasInfo = "user_vas_info" COLOpLogOrder = "oplog_order_%d" COLOpLogCoinOrder = "oplog_coin_order_%d" COLZoneVas = "zone_vas" COLZoneMomentPrice = "zone_moment_price" COLZoneMomentStat = "zone_moment_stat" COLUserIncome = "user_income" COLWithdrawHis = "withdraw_his" ) func (m *Mongo) getColUserIdSeq() *qmgo.Collection { return m.clientMix.Database(DBUserIdSeq).Collection(COLUserIdSeq) } func (m *Mongo) getColUserIdMap() *qmgo.Collection { return m.clientMix.Database(DBUserIdSeq).Collection(COLUserIdMap) } // Account表 func (m *Mongo) getColAccount() *qmgo.Collection { return m.clientMix.Database(DBAccount).Collection(COLAccount) } // 登录表 func (m *Mongo) getColLogin() *qmgo.Collection { return m.clientMix.Database(DBLogin).Collection(COLLogin) } // AccountIdSeq序列表 func (m *Mongo) getColAccountIdSeq() *qmgo.Collection { return m.clientMix.Database(DBAccountIdSeq).Collection(COLAccountIdSeq) } // 空间表 func (m *Mongo) getColZone() *qmgo.Collection { return m.clientMix.Database(DBZone).Collection(COLZone) } // 私密圈动态表 func (m *Mongo) getColZoneMoment() *qmgo.Collection { return m.clientMix.Database(DBZoneMoment).Collection(COLZoneMoment) } // 主播表 func (m *Mongo) getColStreamer() *qmgo.Collection { return m.clientMix.Database(DBStreamer).Collection(COLStreamer) } // 用户增值信息,微信价格等 func (m *Mongo) getColUserVasInfo() *qmgo.Collection { return m.clientMix.Database(DBVas).Collection(COLUserVasInfo) } // 空间增值信息 func (m *Mongo) getColZoneVas() *qmgo.Collection { return m.clientMix.Database(DBVas).Collection(COLZoneVas) } func (m *Mongo) CreateBatch(ctx context.Context, userIdSeqs []*UserIdMap) error { col := m.getColUserIdMap() _, err := col.InsertMany(ctx, userIdSeqs) return err } // account相关 func (m *Mongo) CreateAccount(ctx context.Context, account *dbstruct.Account) error { col := m.getColAccount() _, err := col.InsertOne(ctx, account) return err } func (m *Mongo) CreateLogin(ctx context.Context, login *dbstruct.Login) error { col := m.getColLogin() _, err := col.InsertOne(ctx, login) return err } func (m *Mongo) GetAndUpdateAccountIdSeq(ctx context.Context) (accountIdSeq *dbstruct.AccountIdSeq, err error) { col := m.getColAccountIdSeq() change := qmgo.Change{ Update: qmgo.M{"$inc": qmgo.M{"seq": 1}}, Upsert: true, ReturnNew: false, } accountIdSeqInstance := dbstruct.AccountIdSeq{} if err = col.Find(ctx, qmgo.M{"_id": "account_id_seq_id"}).Apply(change, &accountIdSeqInstance); err != nil { fmt.Printf("change error : %v", err) return } return &accountIdSeqInstance, err } func (m *Mongo) GetAndUpdateUserIdSeq(ctx context.Context) (userIdSeq *dbstruct.UserIdSeq, err error) { col := m.getColUserIdSeq() change := qmgo.Change{ Update: qmgo.M{"$inc": qmgo.M{"seq": 1}}, Upsert: true, ReturnNew: false, } userIdSeqInstance := dbstruct.UserIdSeq{} if err = col.Find(ctx, qmgo.M{"_id": "user_id_seq_id"}).Apply(change, &userIdSeqInstance); err != nil { fmt.Printf("change error : %v", err) return } return &userIdSeqInstance, err } func (m *Mongo) GetMappedUserId(ctx context.Context, userIdSeq int64) (*dbstruct.UserIdMap, error) { userIdMap := &dbstruct.UserIdMap{} col := m.getColUserIdMap() query := qmgo.M{ "_id": userIdSeq, } err := col.Find(ctx, query).One(&userIdMap) return userIdMap, err } func (m *Mongo) GetAccountListByMid(ctx context.Context, mid int64) (*dbstruct.Account, error) { col := m.getColAccount() account := &dbstruct.Account{} query := qmgo.M{ "_id": mid, } err := col.Find(ctx, query).One(account) if err == qmgo.ErrNoSuchDocuments { err = nil return nil, err } if err != nil { return nil, err } return account, err } func (m *Mongo) GetAccountList(ctx context.Context) ([]*dbstruct.Account, error) { col := m.getColAccount() list := make([]*dbstruct.Account, 0) query := qmgo.M{} err := col.Find(ctx, query).All(&list) if err == qmgo.ErrNoSuchDocuments { err = nil return nil, err } if err != nil { return nil, err } return list, err } func (m *Mongo) GetAccountListByMids(ctx context.Context, mids []int64) ([]*dbstruct.Account, error) { col := m.getColAccount() list := make([]*dbstruct.Account, 0) query := qmgo.M{ "_id": qmgo.M{ "$in": mids, }, "del_flag": 0, } err := col.Find(ctx, query).All(&list) if err == qmgo.ErrNoSuchDocuments { err = nil return nil, err } if err != nil { return nil, err } return list, err } func (m *Mongo) UpdateAccount(ctx context.Context, account *dbstruct.Account) error { col := m.getColAccount() set := qmgo.M{ "user_id_string": *account.UserIdString, } set["ut"] = time.Now().Unix() up := qmgo.M{ "$set": set, } err := col.UpdateId(ctx, *account.Mid, up) return err } func (m *Mongo) GetZoneListByMids(ctx context.Context, mids []int64) ([]*dbstruct.Zone, error) { col := m.getColZone() list := make([]*dbstruct.Zone, 0) query := qmgo.M{ "del_flag": 0, "mid": qmgo.M{ "$in": mids, }, } err := col.Find(ctx, query).All(&list) if err == qmgo.ErrNoSuchDocuments { err = nil return list, err } return list, err } func (m *Mongo) GetZoneMomentCountByMid(ctx context.Context, mid int64) (int64, error) { col := m.getColZoneMoment() query := qmgo.M{ "mid": mid, "del_flag": 0, } return col.Find(ctx, query).Count() } func (m *Mongo) GetStreamerList(ctx context.Context, offset, limit int) ([]*dbstruct.Streamer, error) { list := make([]*dbstruct.Streamer, 0) col := m.getColStreamer() query := qmgo.M{ "del_flag": 0, } err := col.Find(ctx, query).Sort("_id").Skip(int64(offset)).Limit(int64(limit)).All(&list) if err == qmgo.ErrNoSuchDocuments { err = nil return list, err } return list, err } // 用户增值信息 func (m *Mongo) GetUserVasInfoByMids(ctx context.Context, mids []int64) ([]*dbstruct.UserVasInfo, error) { list := make([]*dbstruct.UserVasInfo, 0) col := m.getColUserVasInfo() query := qmgo.M{ "_id": qmgo.M{ "$in": mids, }, } err := col.Find(ctx, query).All(&list) if err == qmgo.ErrNoSuchDocuments { err = nil return nil, nil } if err != nil { return nil, err } return list, nil } // 空间价格信息 func (m *Mongo) GetZoneVasByIds(ctx context.Context, zids []int64) ([]*dbstruct.ZoneVas, error) { list := make([]*dbstruct.ZoneVas, 0) col := m.getColZoneVas() query := qmgo.M{ "_id": qmgo.M{ "$in": zids, }, } err := col.Find(ctx, query).One(&list) if err == qmgo.ErrNoSuchDocuments { err = nil return nil, nil } if err != nil { return nil, err } return list, nil }