package importfunc import ( "context" "fmt" "strings" "time" "github.com/Leufolium/test/dbstruct" "github.com/Leufolium/test/es" "github.com/Leufolium/test/mongo" "github.com/Leufolium/test/util" "github.com/mozillazg/go-pinyin" ) func ImportStreamerIntoEs() { fmt.Println("Start importing...") mcli, err := mongo.NewMongo() if err != nil { fmt.Printf("mongo client init fail : %v", err) return } escli, err := es.NewElasticSearch() if err != nil { fmt.Printf("elastic search client init fail : %v", err) return } ctx := context.Background() limit := 1000 offset := 0 streamers, err := mcli.GetStreamerList(ctx, offset, limit) if err != nil { fmt.Printf("GetStreamerListfail : %v", err) return } mids := make([]int64, 0) for _, streamer := range streamers { mids = append(mids, streamer.GetMid()) } accounts, _ := mcli.GetAccountListByMids(ctx, mids) acctMp := make(map[int64]*dbstruct.Account) for _, acct := range accounts { acctMp[acct.GetMid()] = acct } zones, _ := mcli.GetZoneListByMids(ctx, mids) zoneMp := make(map[int64]*dbstruct.Zone) for _, zone := range zones { zoneMp[zone.GetMid()] = zone } userInfos, _ := mcli.GetUserVasInfoByMids(ctx, mids) userInfoMp := make(map[int64]*dbstruct.UserVasInfo) for _, userInfo := range userInfos { userInfoMp[userInfo.Mid] = userInfo } zids := make([]int64, 0) fmt.Printf("zid: [") for _, zone := range zones { zids = append(zids, zone.GetId()) fmt.Printf("%v ", zone.GetId()) } fmt.Printf("]") zoneVases, _ := mcli.GetZoneVasByIds(ctx, zids) zoneVasMp := make(map[int64]*dbstruct.ZoneVas) for _, zoneVas := range zoneVases { zoneVasMp[zoneVas.Zid] = zoneVas } streameraccts := make([]*dbstruct.EsStreamerAcct, 0) fmt.Printf("price: [") for _, streamer := range streamers { zone, ok := zoneMp[streamer.GetMid()] lastZoneMomentCreateDay := int64(0) zoneadmissionprice := int64(-1) if ok && zone.GetLastZoneMomentCt() != 0 { lastZoneMomentCreateDay = util.GetDayStartTimeStamp(time.Unix(zone.GetLastZoneMomentCt(), 0)) zoneVas, ok1 := zoneVasMp[zone.GetId()] if ok1 { zoneadmissionprice = zoneVas.AdmissionPrice fmt.Printf("%v ", zoneadmissionprice) } } name := "" py := "" userIdString := "" acct, ok := acctMp[streamer.GetMid()] if ok { name = acct.GetName() py = strings.Join(pinyin.LazyConvert(acct.GetName(), nil), "") userIdString = util.DerefString(acct.UserIdString) } wechatcoinprice := int64(0) userInfo, ok := userInfoMp[streamer.GetMid()] if ok { wechatcoinprice = userInfo.WechatCoinPrice } streameraccts = append(streameraccts, &dbstruct.EsStreamerAcct{ Mid: streamer.GetMid(), Name: name, UserIdString: userIdString, PinYin: py, Age: util.DerefInt64(streamer.Age), Height: util.DerefInt64(streamer.Height), Weight: util.DerefInt64(streamer.Weight), City: util.DerefString(streamer.City), Constellation: util.DerefString(streamer.Constellation), LastZoneMomentCreateDayStart: lastZoneMomentCreateDay, WechatCoinPrice: wechatcoinprice, ZoneAdmissionPrice: zoneadmissionprice, Ct: time.Now().Unix(), Ut: time.Now().Unix(), DelFlag: 0, }) } fmt.Printf("]") err = escli.CreateStreamerAcct(ctx, streameraccts) if err != nil { fmt.Printf("CreateStreamerAcct : %v", err) return } fmt.Println("End importing...") }