package importfunc import ( "context" "fmt" "strings" "time" "github.com/Leufolium/test/dbstruct" "github.com/Leufolium/test/es" "github.com/Leufolium/test/mongo" "github.com/Leufolium/test/util" "github.com/mozillazg/go-pinyin" ) func ImportStreamerIntoEs() { fmt.Println("Start importing...") mcli, err := mongo.NewMongo() if err != nil { fmt.Printf("mongo client init fail : %v", err) return } escli, err := es.NewElasticSearch() if err != nil { fmt.Printf("elastic search client init fail : %v", err) return } ctx := context.Background() limit := 1000 offset := 0 streamers, err := mcli.GetStreamerList(ctx, offset, limit) if err != nil { fmt.Printf("GetStreamerListfail : %v", err) return } mids := make([]int64, 0) for _, streamer := range streamers { mids = append(mids, streamer.GetMid()) } accounts, _ := mcli.GetAccountListByMids(ctx, mids) acctMp := make(map[int64]*dbstruct.Account) for _, acct := range accounts { acctMp[acct.GetMid()] = acct } zones, _ := mcli.GetZoneListByMids(ctx, mids) zoneMp := make(map[int64]*dbstruct.Zone) for _, zone := range zones { zoneMp[zone.GetMid()] = zone } streameraccts := make([]*dbstruct.EsStreamerAcct, 0) for _, streamer := range streamers { zone, ok := zoneMp[streamer.GetMid()] lastZoneMomentCreateDay := int64(0) if ok && zone.GetLastZoneMomentCt() != 0 { lastZoneMomentCreateDay = util.GetDayStartTimeStamp(time.Unix(zone.GetLastZoneMomentCt(), 0)) } name := "" py := "" userIdString := "" acct, ok := acctMp[streamer.GetMid()] if ok { name = acct.GetName() py = strings.Join(pinyin.LazyConvert(acct.GetName(), nil), "") userIdString = util.DerefString(acct.UserIdString) } streameraccts = append(streameraccts, &dbstruct.EsStreamerAcct{ Mid: streamer.GetMid(), Name: name, UserIdString: userIdString, PinYin: py, Age: util.DerefInt64(streamer.Age), Height: util.DerefInt64(streamer.Height), Weight: util.DerefInt64(streamer.Weight), City: util.DerefString(streamer.City), Constellation: util.DerefString(streamer.Constellation), LastZoneMomentCreateDayStart: lastZoneMomentCreateDay, Ct: time.Now().Unix(), Ut: time.Now().Unix(), DelFlag: 0, }) } err = escli.CreateStreamerAcct(ctx, streameraccts) if err != nil { fmt.Printf("CreateStreamerAcct : %v", err) return } fmt.Println("End importing...") }