dataprep/importfunc/streamer_into_es.go

131 lines
3.6 KiB
Go

package importfunc
import (
"context"
"fmt"
"strings"
"time"
"github.com/Leufolium/test/dbstruct"
"github.com/Leufolium/test/es"
"github.com/Leufolium/test/mongo"
"github.com/Leufolium/test/util"
"github.com/mozillazg/go-pinyin"
)
func ImportStreamerIntoEs() {
fmt.Println("Start importing...")
mcli, err := mongo.NewMongo()
if err != nil {
fmt.Printf("mongo client init fail : %v", err)
return
}
escli, err := es.NewElasticSearch()
if err != nil {
fmt.Printf("elastic search client init fail : %v", err)
return
}
ctx := context.Background()
limit := 1000
offset := 0
streamers, err := mcli.GetStreamerList(ctx, offset, limit)
if err != nil {
fmt.Printf("GetStreamerListfail : %v", err)
return
}
mids := make([]int64, 0)
for _, streamer := range streamers {
mids = append(mids, streamer.GetMid())
}
accounts, _ := mcli.GetAccountListByMids(ctx, mids)
acctMp := make(map[int64]*dbstruct.Account)
for _, acct := range accounts {
acctMp[acct.GetMid()] = acct
}
zones, _ := mcli.GetZoneListByMids(ctx, mids)
zoneMp := make(map[int64]*dbstruct.Zone)
for _, zone := range zones {
zoneMp[zone.GetMid()] = zone
}
userInfos, _ := mcli.GetUserVasInfoByMids(ctx, mids)
userInfoMp := make(map[int64]*dbstruct.UserVasInfo)
for _, userInfo := range userInfos {
userInfoMp[userInfo.Mid] = userInfo
}
zids := make([]int64, 0)
fmt.Printf("zid: [")
for _, zone := range zones {
zids = append(zids, zone.GetId())
fmt.Printf("%v ", zone.GetId())
}
fmt.Printf("]")
zoneVases, _ := mcli.GetZoneVasByIds(ctx, zids)
zoneVasMp := make(map[int64]*dbstruct.ZoneVas)
for _, zoneVas := range zoneVases {
zoneVasMp[zoneVas.Zid] = zoneVas
}
streameraccts := make([]*dbstruct.EsStreamerAcct, 0)
fmt.Printf("price: [")
for _, streamer := range streamers {
zone, ok := zoneMp[streamer.GetMid()]
lastZoneMomentCreateDay := int64(0)
zoneadmissionprice := int64(-999999)
if ok && zone.GetLastZoneMomentCt() != 0 {
lastZoneMomentCreateDay = util.GetDayStartTimeStamp(time.Unix(zone.GetLastZoneMomentCt(), 0))
zoneVas, ok1 := zoneVasMp[zone.GetId()]
if ok1 {
zoneadmissionprice = zoneVas.AdmissionPrice
fmt.Printf("%v ", zoneadmissionprice)
}
}
name := ""
py := ""
userIdString := ""
acct, ok := acctMp[streamer.GetMid()]
if ok {
name = acct.GetName()
py = strings.Join(pinyin.LazyConvert(acct.GetName(), nil), "")
userIdString = util.DerefString(acct.UserIdString)
}
wechatcoinprice := int64(0)
userInfo, ok := userInfoMp[streamer.GetMid()]
if ok {
wechatcoinprice = userInfo.WechatCoinPrice
}
streameraccts = append(streameraccts, &dbstruct.EsStreamerAcct{
Mid: streamer.GetMid(),
Name: name,
UserIdString: userIdString,
PinYin: py,
Age: util.DerefInt64(streamer.Age),
Height: util.DerefInt64(streamer.Height),
Weight: util.DerefInt64(streamer.Weight),
Fans: util.DerefInt64(streamer.Fans),
City: util.DerefString(streamer.City),
Constellation: util.DerefString(streamer.Constellation),
LastZoneMomentCreateDayStart: lastZoneMomentCreateDay,
WechatCoinPrice: wechatcoinprice,
ZoneAdmissionPrice: zoneadmissionprice,
Ct: time.Now().Unix(),
Ut: time.Now().Unix(),
DelFlag: 0,
})
}
fmt.Printf("]")
err = escli.CreateStreamerAcct(ctx, streameraccts)
if err != nil {
fmt.Printf("CreateStreamerAcct : %v", err)
return
}
fmt.Println("End importing...")
}