diff --git a/api/proto/account/proto/account_op.go b/api/proto/account/proto/account_op.go index 16edf625..599b42f7 100644 --- a/api/proto/account/proto/account_op.go +++ b/api/proto/account/proto/account_op.go @@ -112,48 +112,6 @@ type OpGetMobilePhoneByUserIdResp struct { Data *OpGetMobilePhoneByUserIdData `json:"data"` } -// 根据UserId模糊查询(和姓名取交集) -type OpListFuzzilyByUserIdReq struct { - base.BaseRequest - UserId *int64 `json:"user_id"` //user_id模糊匹配 - Role *int64 `json:"role"` //角色 - Offset int `json:"offset"` - Limit int `json:"limit"` - Sort []string -} - -type OpListFuzzilyByUserIdData struct { - List []*OpListVO `json:"list"` - Offset int `json:"offset"` - More int `json:"more"` -} - -type OpListFuzzilyByUserIdResp struct { - base.BaseResponse - Data *OpListFuzzilyByUserIdData `json:"data"` -} - -// 根据Name模糊查询 -type OpListFuzzilyByNameReq struct { - base.BaseRequest - Name string `json:"name"` //name模糊匹配 - Role *int64 `json:"role"` //角色 - Offset int `json:"offset"` - Limit int `json:"limit"` - Sort []string -} - -type OpListFuzzilyByNameData struct { - List []*OpListVO `json:"list"` - Offset int `json:"offset"` - More int `json:"more"` -} - -type OpListFuzzilyByNameResp struct { - base.BaseResponse - Data *OpListFuzzilyByNameData `json:"data"` -} - // op 列表-查询他人 type OpListOthersByMidReq struct { base.BaseRequest diff --git a/api/proto/account/proto/not_null_def_op.go b/api/proto/account/proto/not_null_def_op.go index dbf02ee2..608e019a 100644 --- a/api/proto/account/proto/not_null_def_op.go +++ b/api/proto/account/proto/not_null_def_op.go @@ -29,18 +29,6 @@ func (p *OpGetMobilePhoneByUserIdReq) ProvideNotNullValue() (params []*validator return params } -func (p *OpListFuzzilyByUserIdReq) ProvideNotNullValue() (params []*validator.JsonParam) { - params = make([]*validator.JsonParam, 1) - params[0] = validator.NewInt64PtrParam("查询UserId不可为空!", p.UserId) - return params -} - -func (p *OpListFuzzilyByNameReq) ProvideNotNullValue() (params []*validator.JsonParam) { - params = make([]*validator.JsonParam, 1) - params[0] = validator.NewStringParam("查询UserId不可为空!", p.Name) - return params -} - func (p *OpListOthersByMidReq) ProvideNotNullValue() (params []*validator.JsonParam) { params = make([]*validator.JsonParam, 1) params[0] = validator.NewInt64PtrParam("查询Mid不可为空!", p.Mid) diff --git a/api/proto/streamer_acct/proto/streamer_acct_op.go b/api/proto/streamer_acct/proto/streamer_acct_op.go new file mode 100644 index 00000000..5e3ec58a --- /dev/null +++ b/api/proto/streamer_acct/proto/streamer_acct_op.go @@ -0,0 +1,37 @@ +package proto + +import "service/api/base" + +// 根据UserId模糊查询(和姓名取并集) +type OpListFuzzilyByUserIdReq struct { + base.BaseRequest + UserIdString string `json:"user_id_string"` //user_id模糊匹配 + Offset int `json:"offset"` + Limit int `json:"limit"` + Sort []string +} + +type OpListFuzzilyByUserIdData struct { +} + +type OpListFuzzilyByUserIdResp struct { + base.BaseResponse + Data *OpListFuzzilyByUserIdData `json:"data"` +} + +// 根据Name模糊查询 +type OpListFuzzilyByNameReq struct { + base.BaseRequest + Name string `json:"name"` //name模糊匹配 + Offset int `json:"offset"` + Limit int `json:"limit"` + Sort []string +} + +type OpListFuzzilyByNameData struct { +} + +type OpListFuzzilyByNameResp struct { + base.BaseResponse + Data *OpListFuzzilyByNameData `json:"data"` +} diff --git a/app/mix/dao/elasticsearch.go b/app/mix/dao/elasticsearch.go index 59e6464d..eeb51bf1 100644 --- a/app/mix/dao/elasticsearch.go +++ b/app/mix/dao/elasticsearch.go @@ -1,4 +1,125 @@ package dao +import ( + "encoding/json" + "fmt" + "service/app/mix/conf" + "service/bizcommon/util" + "service/dbstruct" + "service/library/elasticsearchdb" + "service/library/logger" + + streameracctproto "service/api/proto/streamer_acct/proto" + + "github.com/gin-gonic/gin" + "github.com/olivere/elastic/v7" +) + +const ( + EsEnvProd = 0 + EsEnvTest = 1 + EsEnvLocal = 2 +) + type ElasticSearch struct { + clientMix *elastic.Client + indexsuffix string +} + +func NewElasticSearch(cfg *conf.ConfigSt) (es *ElasticSearch, err error) { + es = new(ElasticSearch) + + es.clientMix, err = elasticsearchdb.NewElasticSearchClient(cfg.ElasticSearch) + if err != nil { + logger.Error("NewElasticSearchClient fail, cfg: %v, err: %v", util.ToJson(cfg.ElasticSearch), err) + return + } + + es.indexsuffix = cfg.ElasticSearch.IndexSuffix + + return +} + +const ( + IndexStreamerAcct = "streamer_acct" + TypeStreamerAcct = "_doc" +) + +func (es *ElasticSearch) getIndexStreamerAcct() string { + return IndexStreamerAcct + es.indexsuffix +} + +func (es *ElasticSearch) CreateStreamerAcct(ctx *gin.Context, streameraccts []*dbstruct.EsStreamerAcct) error { + bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true") + for _, streameracct := range streameraccts { + bulk.Add(elastic.NewBulkCreateRequest().Doc(streameracct)) + } + _, err := bulk.Do(ctx) + return err +} + +func (es *ElasticSearch) UpdateStreamerAcct(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcct) error { + _, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.Mid)).Doc(map[string]any{ + "name": streameracct.Name, + "ut": streameracct.Ut, + }).Do(ctx) + return err +} + +func (es *ElasticSearch) GetStreamerAcctListFuzzilyByName(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByNameReq) (list []*dbstruct.EsStreamerAcct, err error) { + + list = make([]*dbstruct.EsStreamerAcct, 0) + + delFlagClause := elastic.NewMatchQuery("del_flag", 0) + nameClause := elastic.NewMatchQuery("name", req.Name) + query := elastic.NewBoolQuery() + query.Must(nameClause, delFlagClause) + + res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) + if err != nil { + logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) + return + } + + for _, hit := range res.Hits.Hits { + streameracct := &dbstruct.EsStreamerAcct{} + err = json.Unmarshal(hit.Source, streameracct) + if err != nil { + logger.Error("json Unmarshal fail, err: %v", err) + return + } + list = append(list, streameracct) + } + + return +} + +func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByUserIdReq) (list []*dbstruct.EsStreamerAcct, err error) { + + list = make([]*dbstruct.EsStreamerAcct, 0) + + delFlagClause := elastic.NewMatchQuery("del_flag", 0) + nameClause := elastic.NewMatchQuery("name", req.UserIdString) + userIdStringClause := elastic.NewMatchQuery("user_id_string", req.UserIdString) + orClause := elastic.NewBoolQuery().Should(nameClause, userIdStringClause) + query := elastic.NewBoolQuery().Must() + query.Must(orClause, delFlagClause) + + res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) + if err != nil { + logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) + return + } + + for _, hit := range res.Hits.Hits { + streameracct := &dbstruct.EsStreamerAcct{} + err = json.Unmarshal(hit.Source, streameracct) + if err != nil { + logger.Error("json Unmarshal fail, err: %v", err) + return + } + list = append(list, streameracct) + } + + return } diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index 01bb8f5f..4aa47572 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -1246,57 +1246,6 @@ func (m *Mongo) GetAccountListByPhoneHash(ctx *gin.Context, phonehash string) ([ return list, err } -func (m *Mongo) GetStreamerAcctListFuzzilyByName(ctx *gin.Context, req *accountproto.OpListFuzzilyByNameReq) ([]*dbstruct.StreamerAcct, error) { - list := make([]*dbstruct.StreamerAcct, 0) - col := m.getColStreamerAcct() - query := qmgo.M{ - "del_flag": 0, - "name": bson.M{ - "$regex": req.Name, - }, - } - if len(req.Sort) == 0 { - req.Sort = append(req.Sort, "-ct") - } - - err := col.Find(ctx, query).Sort(req.Sort...).Skip(int64(req.Offset)).Limit(int64(req.Limit)).All(&list) - if err == qmgo.ErrNoSuchDocuments { - err = nil - return list, err - } - return list, err -} - -func (m *Mongo) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, req *accountproto.OpListFuzzilyByUserIdReq) ([]*dbstruct.StreamerAcct, error) { - list := make([]*dbstruct.StreamerAcct, 0) - col := m.getColStreamerAcct() - query := qmgo.M{ - "del_flag": 0, - "$or": []bson.M{ - { - "name": bson.M{ - "$regex": fmt.Sprint(util.DerefInt64(req.UserId)), - }, - }, - { - "user_id_string": bson.M{ - "$regex": fmt.Sprint(util.DerefInt64(req.UserId)), - }, - }, - }, - } - if len(req.Sort) == 0 { - req.Sort = append(req.Sort, "-ct") - } - - err := col.Find(ctx, query).Sort(req.Sort...).Skip(int64(req.Offset)).Limit(int64(req.Limit)).All(&list) - if err == qmgo.ErrNoSuchDocuments { - err = nil - return list, err - } - return list, err -} - func (m *Mongo) UpdateAccountByIds(ctx *gin.Context, account *dbstruct.Account, ids []int64) error { col := m.getColAccount() set := util.EntityToM(account) @@ -1446,25 +1395,6 @@ func (m *Mongo) CreateAccountHis(ctx *gin.Context, accounts []*dbstruct.Account) return err } -// streamer_acct相关 -func (m *Mongo) CreateStreamerAcct(ctx *gin.Context, accounts []*dbstruct.StreamerAcct) error { - col := m.getColStreamerAcct() - _, err := col.InsertMany(ctx, accounts) - return err -} - -func (m *Mongo) UpdateStreamerAcctName(ctx *gin.Context, id int64, name string) error { - col := m.getColStreamerAcct() - up := qmgo.M{ - "$set": qmgo.M{ - "ut": time.Now().Unix(), - "name": name, - }, - } - err := col.UpdateId(ctx, id, up) - return err -} - // vericode相关 func (m *Mongo) CreateVeriCode(ctx *gin.Context, vericode *dbstruct.VeriCode) error { col := m.getColVeriCode() diff --git a/app/mix/dao/store.go b/app/mix/dao/store.go index a0d37e2f..10c359b1 100644 --- a/app/mix/dao/store.go +++ b/app/mix/dao/store.go @@ -53,6 +53,7 @@ type IStore interface { type Store struct { *Mongo *Mysql + *ElasticSearch } func NewStore(cfg *conf.ConfigSt) (store *Store, err error) { @@ -68,9 +69,16 @@ func NewStore(cfg *conf.ConfigSt) (store *Store, err error) { return } + es, err := NewElasticSearch(cfg) + if err != nil { + logger.Error("NewElasticSearch fail, err: %v", err) + return + } + store = &Store{ - Mongo: mongo, - Mysql: mysql, + Mongo: mongo, + Mysql: mysql, + ElasticSearch: es, } return } diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index d3cb28c2..6d3389c9 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -17,6 +17,7 @@ import ( moment_audit_taskproto "service/api/proto/moment_audit_task/proto" realname_authenticationproto "service/api/proto/realname_authentication/proto" streamerproto "service/api/proto/streamer/proto" + streameracctproto "service/api/proto/streamer_acct/proto" streamerauthapprovalproto "service/api/proto/streamerauthapproval/proto" streamerlinkproto "service/api/proto/streamerlink/proto" thumbsupproto "service/api/proto/thumbsup/proto" @@ -364,9 +365,12 @@ func (s *Service) ApiUpdateAccount(ctx *gin.Context, req *accountproto.ApiUpdate return } - if oldAccount.GetRole() == consts.Streamer { - if err := _DefaultAccount.OpUpdateStreamerAcctName(ctx, util.DerefInt64(req.Account.Mid), util.DerefString(req.Account.Name)); err != nil { - logger.Error("OpUpdateStreamerAcctName fail, req: %v, err: %v", util.ToJson(req), err) + if oldAccount.GetRole() == consts.Streamer && req.Account.Name != nil { + if err := _DefaultStreamerAcct.OpUpdate(ctx, &dbstruct.EsStreamerAcct{ + Mid: req.Account.GetMid(), + Name: req.Account.GetName(), + }); err != nil { + logger.Error("OpUpdate fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeAccountSrvFail return } @@ -1231,24 +1235,22 @@ func (s *Service) ApiGetStreamerExtListFuzzilyByUserId(ctx *gin.Context, req *st ec = errcode.ErrCodeStreamerSrvOk //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 - accountList, err := _DefaultAccount.OpListStreamerAcctFuzzilyByUserId(ctx, &accountproto.OpListFuzzilyByUserIdReq{ - UserId: req.UserId, - Role: goproto.Int64(consts.Streamer), - Offset: req.Offset, - Limit: req.Limit, - Sort: []string{"_id"}, + streameraccts, err := _DefaultStreamerAcct.OpListStreamerAcctFuzzilyByUserId(ctx, &streameracctproto.OpListFuzzilyByUserIdReq{ + UserIdString: fmt.Sprint(util.DerefInt64(req.UserId)), + Offset: req.Offset, + Limit: req.Limit, + Sort: []string{"_id"}, }) if err != nil { - logger.Error("Account OpListFuzzilyByUserId fail, req: %v, err: %v", util.ToJson(req), err) + logger.Error("StreamerAcct OpListFuzzilyByUserId fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeAccountSrvFail return } //2.获取mids mids := make([]int64, 0) - for _, v := range accountList { - mid := util.DerefInt64(v.Mid) - mids = append(mids, mid) + for _, v := range streameraccts { + mids = append(mids, v.Mid) } mp, err := s.utilGetStreamerExtMapByMids(ctx, mids, consts.InterfaceType_Api) @@ -1270,9 +1272,8 @@ func (s *Service) ApiGetStreamerExtListFuzzilyByName(ctx *gin.Context, req *stre ec = errcode.ErrCodeStreamerSrvOk //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 - accountList, err := _DefaultAccount.OpListStreamerAcctFuzzilyByName(ctx, &accountproto.OpListFuzzilyByNameReq{ + streameraccts, err := _DefaultStreamerAcct.OpListStreamerAcctFuzzilyByName(ctx, &streameracctproto.OpListFuzzilyByNameReq{ Name: req.Name, - Role: goproto.Int64(consts.Streamer), Offset: req.Offset, Limit: req.Limit, Sort: []string{"_id"}, @@ -1285,9 +1286,8 @@ func (s *Service) ApiGetStreamerExtListFuzzilyByName(ctx *gin.Context, req *stre //2.获取mids mids := make([]int64, 0) - for _, v := range accountList { - mid := util.DerefInt64(v.Mid) - mids = append(mids, mid) + for _, v := range streameraccts { + mids = append(mids, v.Mid) } mp, err := s.utilGetStreamerExtMapByMids(ctx, mids, consts.InterfaceType_Api) diff --git a/app/mix/service/logic/account.go b/app/mix/service/logic/account.go index 0919c9d1..56d2c3ea 100644 --- a/app/mix/service/logic/account.go +++ b/app/mix/service/logic/account.go @@ -152,24 +152,6 @@ func (p *Account) OpListByUserId(ctx *gin.Context, req *accountproto.OpListByUse return list, nil } -func (p *Account) OpListStreamerAcctFuzzilyByUserId(ctx *gin.Context, req *accountproto.OpListFuzzilyByUserIdReq) ([]*dbstruct.StreamerAcct, error) { - list, err := p.store.GetStreamerAcctListFuzzilyByUserId(ctx, req) - if err != nil { - logger.Error("GetStreamerAcctListFuzzilyByUserId fail, err: %v", err) - return nil, err - } - return list, nil -} - -func (p *Account) OpListStreamerAcctFuzzilyByName(ctx *gin.Context, req *accountproto.OpListFuzzilyByNameReq) ([]*dbstruct.StreamerAcct, error) { - list, err := p.store.GetStreamerAcctListFuzzilyByName(ctx, req) - if err != nil { - logger.Error("GetStreamerAcctListFuzzilyByName fail, err: %v", err) - return nil, err - } - return list, nil -} - func (p *Account) OpUpdateByIds(ctx *gin.Context, req *accountproto.OpUpdateByIdsReq) error { err := p.store.UpdateAccountByIds(ctx, req.Account, req.Ids) if err != nil { @@ -282,22 +264,3 @@ func (p *Account) OpListByPhoneHash(ctx *gin.Context, phonehash string) ([]*dbst } return list, nil } - -func (p *Account) OpCreateStreamerAcct(ctx *gin.Context, accounts []*dbstruct.StreamerAcct) error { - - err := p.store.CreateStreamerAcct(ctx, accounts) - if err != nil { - logger.Error("CreateAccount fail, err: %v", err) - return err - } - return nil -} - -func (p *Account) OpUpdateStreamerAcctName(ctx *gin.Context, mid int64, name string) error { - err := p.store.UpdateStreamerAcctName(ctx, mid, name) - if err != nil { - logger.Error("UpdateAccount fail, err: %v", err) - return err - } - return nil -} diff --git a/app/mix/service/logic/streamer_acct.go b/app/mix/service/logic/streamer_acct.go new file mode 100644 index 00000000..68aa0699 --- /dev/null +++ b/app/mix/service/logic/streamer_acct.go @@ -0,0 +1,58 @@ +package logic + +import ( + "service/app/mix/dao" + "service/dbstruct" + "service/library/logger" + + "github.com/gin-gonic/gin" + + streameracctproto "service/api/proto/streamer_acct/proto" +) + +type StreamerAcct struct { + store *dao.Store +} + +func NewStreamerAcct(store *dao.Store) (a *StreamerAcct) { + a = &StreamerAcct{ + store: store, + } + return +} + +func (p *StreamerAcct) OpCreate(ctx *gin.Context, streameraccts []*dbstruct.EsStreamerAcct) error { + err := p.store.CreateStreamerAcct(ctx, streameraccts) + if err != nil { + logger.Error("CreateStreamerAcct fail, err: %v", err) + return err + } + return nil +} + +func (p *StreamerAcct) OpUpdate(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcct) error { + err := p.store.UpdateStreamerAcct(ctx, streameracct) + if err != nil { + logger.Error("UpdateStreamerAcct fail, err: %v", err) + return err + } + return nil +} + +func (p *StreamerAcct) OpListStreamerAcctFuzzilyByUserId(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByUserIdReq) ([]*dbstruct.EsStreamerAcct, error) { + list, err := p.store.GetStreamerAcctListFuzzilyByUserId(ctx, req) + if err != nil { + logger.Error("GetStreamerAcctListFuzzilyByUserId fail, err: %v", err) + return nil, err + } + return list, nil +} + +func (p *StreamerAcct) OpListStreamerAcctFuzzilyByName(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByNameReq) ([]*dbstruct.EsStreamerAcct, error) { + list, err := p.store.GetStreamerAcctListFuzzilyByName(ctx, req) + if err != nil { + logger.Error("GetStreamerAcctListFuzzilyByName fail, err: %v", err) + return nil, err + } + return list, nil +} diff --git a/app/mix/service/opservice_business_validation.go b/app/mix/service/opservice_business_validation.go index e02924fa..dfeb5045 100644 --- a/app/mix/service/opservice_business_validation.go +++ b/app/mix/service/opservice_business_validation.go @@ -288,42 +288,6 @@ func (s *Service) OpGetMobilePhoneByUserIdBusinessValidate(ctx *gin.Context, req return } -func (s *Service) OpGetAccountListFuzzilyByUserIdBusinessValidate(ctx *gin.Context, req *accountproto.OpListFuzzilyByUserIdReq) (ec errcode.ErrCode) { - ec = errcode.ErrCodeAccountSrvOk - - // 1.业务校验 - result := businessvalidator.NewAuthBusinessValidator(ctx, req). - QueryAccount(_DefaultAccount.OpListByMid). - EnsureAccountExist(). - EnsureIsOpRole(). - Validate(). - Collect() - if ec = result[0].(errcode.ErrCode); ec != errcode.ErrCodeOk { - logger.Error("OpGetAccountListFuzzilyByUserId business validation failed") - return - } - - return -} - -func (s *Service) OpGetAccountListFuzzilyByNameBusinessValidate(ctx *gin.Context, req *accountproto.OpListFuzzilyByNameReq) (ec errcode.ErrCode) { - ec = errcode.ErrCodeAccountSrvOk - - // 1.业务校验 - result := businessvalidator.NewAuthBusinessValidator(ctx, req). - QueryAccount(_DefaultAccount.OpListByMid). - EnsureAccountExist(). - EnsureIsOpRole(). - Validate(). - Collect() - if ec = result[0].(errcode.ErrCode); ec != errcode.ErrCodeOk { - logger.Error("OpGetAccountListFuzzilyByName business validation failed") - return - } - - return -} - func (s *Service) OpGetAccountListForOthersByMiBusinessValidated(ctx *gin.Context, req *accountproto.OpListOthersByMidReq) (ec errcode.ErrCode) { ec = errcode.ErrCodeAccountSrvOk diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 1992411b..ee90a282 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -29,6 +29,7 @@ import ( realname_authenticationproto "service/api/proto/realname_authentication/proto" resourceproto "service/api/proto/resource/proto" streamerproto "service/api/proto/streamer/proto" + streameracctproto "service/api/proto/streamer_acct/proto" streamerauthapprovalproto "service/api/proto/streamerauthapproval/proto" streamerlinkproto "service/api/proto/streamerlink/proto" textaudittaskproto "service/api/proto/textaudittask/proto" @@ -51,7 +52,6 @@ import ( "service/library/contentaudit/imageaudit" "service/library/contentaudit/textaudit" videomoderation "service/library/contentaudit/video_moderation" - elasticsearch "service/library/elastic_search" "service/library/logger" "service/library/melody" "service/library/mycrypto" @@ -124,6 +124,7 @@ var ( _DefaultZoneMomentCreateTimes *logic.ZoneMomentCreateTimes _DefaultVideoModeration *logic.VideoModeration _DefaultVideoModerationTask *logic.VideoModerationTask + _DefaultStreamerAcct *logic.StreamerAcct ) type Service struct { @@ -152,12 +153,6 @@ func (s *Service) Init(c any) (err error) { logger.Error("cryptoService init, err: %v", err) } - err = elasticsearch.Init(cfg.ElasticSearch) - if err != nil { - logger.Error("elasticsearch.Init fail, cfg: %v, err: %v", util.ToJson(cfg.ElasticSearch), err) - return - } - s.defaultMelody = melody.New() err = alipaycli.InitMulti(cfg.Alipay, cfg.AlipayMYTS) @@ -220,6 +215,7 @@ func (s *Service) Init(c any) (err error) { _DefaultVideoModerationTask = logic.NewVideoModerationTask(store) _DefaultVas = logic.NewVas(store, _DefaultStreamer, _DefaultAccount, _DefaultZone, _DefaultZoneThirdPartner, _DefaultZoneCollaborator) + _DefaultStreamerAcct = logic.NewStreamerAcct(store) return } @@ -817,9 +813,12 @@ func (s *Service) OpUpdateAccount(ctx *gin.Context, req *accountproto.OpUpdateRe return } - if oldAccount.GetRole() == consts.Streamer { - if err := _DefaultAccount.OpUpdateStreamerAcctName(ctx, util.DerefInt64(req.Account.Mid), util.DerefString(req.Account.Name)); err != nil { - logger.Error("OpUpdateStreamerAcctName fail, req: %v, err: %v", util.ToJson(req), err) + if oldAccount.GetRole() == consts.Streamer && req.Account.Name != nil { + if err := _DefaultStreamerAcct.OpUpdate(ctx, &dbstruct.EsStreamerAcct{ + Mid: req.Account.GetMid(), + Name: req.Account.GetName(), + }); err != nil { + logger.Error("OpUpdate fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeAccountSrvFail return } @@ -1773,20 +1772,20 @@ func (s *Service) OpApproveStreamerAuthApproval(ctx *gin.Context, req *streamera ec = errcode.ErrCodeAccountSrvFail return } - streamerAccts := make([]*dbstruct.StreamerAcct, 0) + streamerAccts := make([]*dbstruct.EsStreamerAcct, 0) for _, acct := range accountList { - streamerAccts = append(streamerAccts, &dbstruct.StreamerAcct{ - Mid: acct.Mid, - Name: acct.Name, - UserIdString: acct.UserIdString, - Ct: acct.Ct, - Ut: acct.Ut, - DelFlag: acct.DelFlag, + streamerAccts = append(streamerAccts, &dbstruct.EsStreamerAcct{ + Mid: acct.GetMid(), + Name: acct.GetName(), + UserIdString: acct.GetUserIdString(), + Ct: acct.GetCt(), + Ut: acct.GetUt(), + DelFlag: acct.GetDelFlag(), }) } - err = _DefaultAccount.OpCreateStreamerAcct(ctx, streamerAccts) + err = _DefaultStreamerAcct.OpCreate(ctx, streamerAccts) if err != nil { - logger.Error("_DefaultAccount OpCreateStreamerAcct fail, req: %v, err: %v", util.ToJson(req), err) + logger.Error("_DefaultStreamerAcct OpCreate fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeAccountSrvFail return } @@ -1991,11 +1990,10 @@ func (s *Service) OpGetStreamerExtListFuzzilyByUserId(ctx *gin.Context, req *str } //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 - accountList, err := _DefaultAccount.OpListStreamerAcctFuzzilyByUserId(ctx, &accountproto.OpListFuzzilyByUserIdReq{ - UserId: req.UserId, - Role: goproto.Int64(consts.Streamer), - Offset: req.Offset, - Limit: req.Limit, + streameraccts, err := _DefaultStreamerAcct.OpListStreamerAcctFuzzilyByUserId(ctx, &streameracctproto.OpListFuzzilyByUserIdReq{ + UserIdString: fmt.Sprint(util.DerefInt64(req.UserId)), + Offset: req.Offset, + Limit: req.Limit, }) if err != nil { logger.Error("Account OpListFuzzilyByUserId fail, req: %v, err: %v", util.ToJson(req), err) @@ -2005,9 +2003,8 @@ func (s *Service) OpGetStreamerExtListFuzzilyByUserId(ctx *gin.Context, req *str //2.获取mids mids := make([]int64, 0) - for _, v := range accountList { - mid := util.DerefInt64(v.Mid) - mids = append(mids, mid) + for _, v := range streameraccts { + mids = append(mids, v.Mid) } mp, err := s.utilGetStreamerExtMapByMids(ctx, mids, consts.InterfaceType_Op) @@ -2033,9 +2030,8 @@ func (s *Service) OpGetStreamerExtListFuzzilyByName(ctx *gin.Context, req *strea } //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 - accountList, err := _DefaultAccount.OpListStreamerAcctFuzzilyByName(ctx, &accountproto.OpListFuzzilyByNameReq{ + streameraccts, err := _DefaultStreamerAcct.OpListStreamerAcctFuzzilyByName(ctx, &streameracctproto.OpListFuzzilyByNameReq{ Name: req.Name, - Role: goproto.Int64(consts.Streamer), Offset: req.Offset, Limit: req.Limit, }) @@ -2047,9 +2043,8 @@ func (s *Service) OpGetStreamerExtListFuzzilyByName(ctx *gin.Context, req *strea //2.获取mids mids := make([]int64, 0) - for _, v := range accountList { - mid := util.DerefInt64(v.Mid) - mids = append(mids, mid) + for _, v := range streameraccts { + mids = append(mids, v.Mid) } mp, err := s.utilGetStreamerExtMapByMids(ctx, mids, consts.InterfaceType_Op) diff --git a/app/mix/service/textaudittask_result_handler.go b/app/mix/service/textaudittask_result_handler.go index fbc2db38..fce0b8f9 100644 --- a/app/mix/service/textaudittask_result_handler.go +++ b/app/mix/service/textaudittask_result_handler.go @@ -92,7 +92,10 @@ func (handler *TextAuditTaskResultHandler) generateAccountNameUpdateFunc() { } if acct.GetRole() == consts.Streamer { - return _DefaultAccount.OpUpdateStreamerAcctName(ctx, util.DerefInt64(mid), util.DerefString(name)) + return _DefaultStreamerAcct.OpUpdate(ctx, &dbstruct.EsStreamerAcct{ + Mid: util.DerefInt64(mid), + Name: util.DerefString(name), + }) } return nil diff --git a/dbstruct/account.go b/dbstruct/account.go index 500003d1..491f0b9e 100644 --- a/dbstruct/account.go +++ b/dbstruct/account.go @@ -32,6 +32,48 @@ type Account struct { DelFlag *int64 `json:"del_flag" bson:"del_flag"` // 删除标记,0-否,1-是 } +func (p *Account) GetMid() int64 { + if p == nil || p.Mid == nil { + return -1 + } + return *p.Mid +} + +func (p *Account) GetName() string { + if p == nil || p.Name == nil { + return "" + } + return *p.Name +} + +func (p *Account) GetUserIdString() string { + if p == nil || p.UserIdString == nil { + return "" + } + return *p.UserIdString +} + +func (p *Account) GetCt() int64 { + if p == nil || p.Ct == nil { + return 0 + } + return *p.Ct +} + +func (p *Account) GetUt() int64 { + if p == nil || p.Ut == nil { + return 0 + } + return *p.Ut +} + +func (p *Account) GetDelFlag() int64 { + if p == nil || p.DelFlag == nil { + return 0 + } + return *p.DelFlag +} + func (p *Account) GetRole() int64 { if p == nil || p.Role == nil { return -1 diff --git a/etc/mix/mix-prod.yaml b/etc/mix/mix-prod.yaml index 72a86c4b..411175b6 100644 --- a/etc/mix/mix-prod.yaml +++ b/etc/mix/mix-prod.yaml @@ -27,7 +27,6 @@ mix_mysql: read_timeout_s: 5 write_timeout_s: 3 - crypto: aes: private_key: "Xbz1145141919810" @@ -143,4 +142,5 @@ elastic_search: uri: "http://172.31.37.67:9200" username: "elastic" password: "Wishpal@2024" - sniff: false \ No newline at end of file + sniff: false + index_suffix: "_prod" \ No newline at end of file diff --git a/etc/mix/mix-test.yaml b/etc/mix/mix-test.yaml index 4e6a8d10..7234c8c8 100644 --- a/etc/mix/mix-test.yaml +++ b/etc/mix/mix-test.yaml @@ -143,4 +143,5 @@ elastic_search: uri: "http://172.31.37.67:9200" username: "elastic" password: "Wishpal@2024" - sniff: false \ No newline at end of file + sniff: false + index_suffix: "_test" \ No newline at end of file diff --git a/library/configcenter/configcenter.go b/library/configcenter/configcenter.go index dd0a4d98..e38dc496 100644 --- a/library/configcenter/configcenter.go +++ b/library/configcenter/configcenter.go @@ -167,10 +167,11 @@ type XxlJobConfig struct { // elasticsearch服务配置 type ElasticSearchConfig struct { - Uri string `json:"uri" yaml:"uri"` // 实例地址 - Username string `json:"username" yaml:"username"` // 用户名 - Password string `json:"password" yaml:"password"` // 密码 - Sniff bool `json:"sniff" yaml:"sniff"` + Uri string `json:"uri" yaml:"uri"` // 实例地址 + Username string `json:"username" yaml:"username"` // 用户名 + Password string `json:"password" yaml:"password"` // 密码 + Sniff bool `json:"sniff" yaml:"sniff"` // sniffer + IndexSuffix string `json:"index_suffix" yaml:"index_suffix"` // 索引后缀 } func LoadConfig(configFilePath string, cfg interface{}) error { diff --git a/library/elasticsearchdb/client.go b/library/elasticsearchdb/client.go index f93f850b..98ffb1b6 100644 --- a/library/elasticsearchdb/client.go +++ b/library/elasticsearchdb/client.go @@ -8,7 +8,7 @@ import ( "github.com/olivere/elastic/v7" ) -func Init(cfg *configcenter.ElasticSearchConfig) (client *elastic.Client, err error) { +func NewElasticSearchClient(cfg *configcenter.ElasticSearchConfig) (client *elastic.Client, err error) { client, err = elastic.NewClient( elastic.SetURL(cfg.Uri), elastic.SetSniff(cfg.Sniff),