package es import ( "context" "encoding/json" "fmt" "github.com/Leufolium/test/dbstruct" "github.com/olivere/elastic/v7" ) type ElasticSearch struct { clientMix *elastic.Client } func NewElasticSearch() (es *ElasticSearch, err error) { es = new(ElasticSearch) es.clientMix, err = NewElasticSearchClient() if err != nil { fmt.Printf("NewElasticSearchClient fail, err: %v", err) return } return } const ( IndexStreamerAcct = "streamer_acct" TypeStreamerAcct = "_doc" ) func (es *ElasticSearch) getIndexStreamerAcct() string { return IndexStreamerAcct } func (es *ElasticSearch) CreateStreamerAcct(ctx context.Context, streameraccts []*dbstruct.EsStreamerAcct) error { bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true") for _, streameracct := range streameraccts { bulk.Add(elastic.NewBulkCreateRequest().Id(fmt.Sprint(streameracct.Mid)).Doc(streameracct)) } _, err := bulk.Do(ctx) return err } func (es *ElasticSearch) UpdateStreamerAcct(ctx context.Context, streameracct *dbstruct.EsStreamerAcct) error { _, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.Mid)).Doc(map[string]any{ "name": streameracct.Name, "pinyin": streameracct.PinYin, "ut": streameracct.Ut, }).Do(ctx) return err } func (es *ElasticSearch) GetStreamerAcctList(ctx context.Context) (list []*dbstruct.EsStreamerAcct, err error) { query := elastic.NewMatchAllQuery() list = make([]*dbstruct.EsStreamerAcct, 0) res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).Do(ctx) if err != nil { fmt.Printf("Search %v fail, err: %v", IndexStreamerAcct, err) return } for _, hit := range res.Hits.Hits { streameracct := &dbstruct.EsStreamerAcct{} err = json.Unmarshal(hit.Source, streameracct) if err != nil { fmt.Printf("json Unmarshal fail, err: %v", err) return } list = append(list, streameracct) } return }