From b63febdd669906895a627e17dd4498652af2e22d Mon Sep 17 00:00:00 2001 From: Leufolium Date: Thu, 15 Aug 2024 04:56:58 +0800 Subject: [PATCH] 1 --- cmd/main.go | 708 +------------------------------------ cmd/streamer_into_es.go | 95 +++++ dbstruct/auditcomponent.go | 20 ++ dbstruct/streamer.go | 59 ++++ dbstruct/streamer_acct.go | 20 +- mongo/mongo.go | 23 ++ util/ptrreader.go | 106 ++++++ util/util.go | 20 ++ 8 files changed, 337 insertions(+), 714 deletions(-) create mode 100644 cmd/streamer_into_es.go create mode 100644 dbstruct/auditcomponent.go create mode 100644 dbstruct/streamer.go create mode 100644 util/ptrreader.go create mode 100644 util/util.go diff --git a/cmd/main.go b/cmd/main.go index fb0d751..334bcb7 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,711 +1,5 @@ package main -import ( - "bufio" - "context" - "encoding/base64" - "fmt" - "os" - - myCrypto "github.com/Leufolium/test/crypto" - "github.com/Leufolium/test/dbstruct" - "github.com/Leufolium/test/mongo" - "github.com/Leufolium/test/mysql" -) - func main() { - fmt.Println("Start importing...") - - cryptoService := new(myCrypto.CryptoService) - cryptoService.Init() - - client, err := mongo.NewMongo() - if err != nil { - fmt.Printf("mongo client init fail : %v", err) - return - } - - mysqlcli, err := mysql.NewMysql() - if err != nil { - fmt.Printf("mysql client init fail : %v", err) - return - } - - ctx := context.Background() - mids := []int64{ - 3, - 4, - 5, - 6, - 7, - 8, - 9, - 10, - 11, - 12, - 14, - 15, - 16, - 17, - 19, - 20, - 21, - 22, - 23, - 24, - 25, - 26, - 27, - 28, - 29, - 30, - 31, - 34, - 35, - 36, - 37, - 38, - 44, - 48, - 50, - 58, - 61, - 62, - 63, - 64, - 65, - 66, - 67, - 68, - 69, - 71, - 72, - 73, - 74, - 75, - 76, - 77, - 78, - 79, - 81, - 82, - 83, - 85, - 87, - 88, - 89, - 91, - 92, - 93, - 94, - 95, - 96, - 97, - 98, - 99, - 100, - 101, - 102, - 103, - 104, - 105, - 106, - 107, - 109, - 110, - 111, - 112, - 113, - 115, - 116, - 117, - 118, - 119, - 120, - 121, - 122, - 123, - 124, - 125, - 127, - 131, - 132, - 133, - 134, - 135, - 136, - 137, - 138, - 139, - 140, - 141, - 142, - 144, - 145, - 146, - 147, - 148, - 149, - 150, - 151, - 159, - 245, - 283, - 447, - 493, - 501, - 507, - 750, - 779, - 876, - 897, - 898, - 996, - 1071, - 1188, - 1258, - 1259, - 1336, - 1365, - 1471, - 1624, - 1649, - 1740, - 1832, - 1850, - 1958, - 1982, - 2391, - 2432, - 2508, - 2608, - 2634, - 2686, - 2895, - 2953, - 2997, - 3169, - 3343, - 3364, - 3367, - 3489, - 3536, - 3940, - 4474, - 4497, - 4865, - 5051, - 5161, - 5545, - 5620, - 5681, - 6069, - 6145, - 6520, - 6856, - 7520, - 7875, - 8184, - 9429, - 10351, - 11146, - 11583, - 12771, - 12955, - 13208, - 13324, - 13820, - 13862, - 15851, - 15949, - 15966, - 16311, - 18370, - 18488, - 19230, - 19316, - 20475, - 21951, - 22358, - 23152, - 24457, - 24582, - 25058, - 25114, - 25315, - 26283, - 29980, - 31476, - 31551, - 33487, - 34365, - 35042, - 35373, - 36032, - 36540, - 37567, - 37634, - 37939, - 38971, - 39190, - 40441, - 42720, - 46302, - 48900, - 49436, - 51444, - 51609, - 52499, - 54525, - 55108, - 57245, - 57272, - 60553, - 60978, - 61060, - 65028, - 66771, - 66930, - 66961, - 67378, - 67379, - 67383, - 67420, - 68283, - 68559, - 69379, - 70822, - 71287, - 76278, - 76723, - 77377, - 79390, - 79843, - 80077, - 82391, - 82414, - 84663, - 85910, - 87404, - 87907, - 89818, - 90281, - 90356, - 91236, - 91856, - 93963, - 97359, - 97857, - 99796, - 100601, - 103920, - 105604, - 143961, - 144384, - 145309, - 145740, - 145806, - 146166, - 146340, - 146460, - 146545, - 147322, - 148509, - 152001, - 153632, - 158395, - 159566, - 161609, - 162710, - 162816, - 163374, - 163608, - 163660, - 164533, - 164630, - 166037, - 167340, - 169095, - 172477, - 172731, - 173051, - 173304, - 174440, - 175834, - 176342, - 177637, - 178210, - 179782, - 180889, - 182308, - 184293, - 185893, - 187760, - 190618, - 191324, - 192434, - 192549, - 193781, - 194448, - 195763, - 197811, - 199188, - 202826, - 204372, - 205940, - 206164, - 206580, - 208778, - 209473, - 210852, - 212561, - 213014, - 213554, - 214911, - 215008, - 215135, - 215240, - 217606, - 217860, - 218851, - 220411, - 226924, - 227196, - 228265, - 228359, - 229743, - 230965, - 239368, - 240225, - 242263, - 243225, - 243875, - 249042, - 252780, - 254325, - 254587, - 258846, - 259111, - 259476, - 263758, - 263921, - 264877, - 267713, - 267884, - 268493, - 268954, - 269339, - 269583, - 269632, - 270017, - 270232, - 270602, - 270888, - 273575, - 274697, - 275013, - 275732, - 276039, - 276071, - 276120, - 276466, - 279590, - 280318, - 280544, - 293390, - 293753, - 296457, - 296564, - 300771, - 301759, - 302200, - 308732, - 308873, - 310356, - 311294, - 312807, - 317098, - 317333, - 320610, - 322645, - 323517, - 326483, - 327391, - 328717, - 329947, - 330130, - 330205, - 330649, - 335318, - 335373, - 335577, - 335736, - 337587, - 337626, - 337829, - 338334, - 338605, - 341103, - 341351, - 341834, - 342076, - 342468, - 342969, - 344012, - 344664, - 344844, - 345300, - 345521, - 345859, - 346732, - 347900, - 348187, - 348500, - 348616, - 348991, - 349805, - 351513, - 352007, - 352353, - 352787, - 352921, - 352924, - 353051, - 353654, - 353681, - 353921, - 354058, - 354125, - 354236, - 355229, - 356071, - 356408, - 356641, - 356737, - 357096, - 357146, - 357181, - 357669, - 358886, - 359048, - 359406, - 360080, - 360323, - 360412, - 361340, - 365051, - 366194, - 367375, - 367849, - 370070, - 370793, - 371145, - 371600, - 373789, - 374749, - 376093, - 376243, - 377767, - 378027, - 378518, - 378891, - 379005, - 379253, - 380316, - 380647, - 380765, - 381577, - 382101, - 382874, - 383225, - 386910, - 387243, - 388957, - 389643, - 389928, - 390102, - 391886, - 392278, - 392896, - 393152, - 393298, - 393529, - 396777, - 397554, - 399206, - 399212, - 399876, - 401922, - 402096, - 404583, - 404820, - 406241, - 406445, - 408607, - 408708, - 408893, - 409130, - 409145, - 409201, - 410700, - 412300, - 412533, - 412764, - 412769, - 414419, - 414568, - 414570, - 414996, - 416251, - 418307, - 420413, - 421224, - 421229, - 421408, - 421498, - 421672, - 421876, - 421951, - 422423, - 424452, - 425002, - 425180, - 425534, - 425674, - 426274, - 426738, - 427317, - 429542, - 429897, - 430335, - 430734, - 431977, - 432362, - 432447, - 432893, - 432959, - 435296, - 436833, - 436883, - 437166, - 437347, - 437450, - 437526, - 437934, - 437955, - 438312, - 438643, - 439926, - 440514, - 440977, - 441008, - 441744, - 441793, - 441883, - 443406, - 445007, - 445068, - 445162, - 445215, - 445553, - 445716, - 445860, - 446143, - 446204, - 446887, - 447665, - 447925, - 449525, - 449933, - 450124, - 450203, - 450388, - 450488, - 450739, - 450832, - 450834, - 451329, - 451333, - 451444, - 451513, - 451771, - 454716, - 454721, - 455290, - 455709, - 456325, - 456385, - 456449, - 456473, - 456921, - 457540, - 458707, - 459369, - 459482, - 459716, - 459920, - 460252, - 460716, - } - - accounts, err := client.GetAccountListByMids(ctx, mids) - if err != nil { - fmt.Printf("GetAccountListByMids fail, err :%v\n", err) - } - - zones, err := client.GetZoneListByMids(ctx, mids) - if err != nil { - fmt.Printf("GetZoneListByMids fail, err :%v\n", err) - } - - outfilePath := "/app/dataprep/outfile.txt" - outfile, err := os.OpenFile(outfilePath, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - fmt.Printf("Open File Err : %v", err) - } - defer outfile.Close() - writer := bufio.NewWriter(outfile) - - zoneMp := make(map[int64]*dbstruct.Zone, 0) - zids := make([]int64, 0) - for _, zone := range zones { - zoneMp[zone.GetMid()] = zone - zids = append(zids, zone.GetId()) - } - - zcs, err := mysqlcli.GetZoneMemberCountGroupByZid(ctx, nil, zids) - if err != nil { - fmt.Printf("GetZoneMemberCountGroupByZid Err : %v", err) - } - zcMp := make(map[int64]int64, 0) - for _, zc := range zcs { - zcMp[zc.GetZid()] = zc.GetNum() - } - - for _, account := range accounts { - base64DecryptedBytes, _ := base64.StdEncoding.DecodeString(*account.MobilePhone) - phone, err := cryptoService.DecryptByAES(base64DecryptedBytes) - if err != nil { - fmt.Printf("DecryptByAES err :%v\n", err) - } - phoneStr := string(phone) - mid := account.GetMid() - name := account.GetName() - if name == "账号已注销" { - continue - } - userId := account.GetUserId() - count := int64(0) - membercount := int64(0) - zone, ok := zoneMp[mid] - if ok { - count = zone.GetZoneMomentCount() - membercount = zcMp[zone.GetId()] - } - str := fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\n", mid, name, userId, phoneStr, count, membercount) - fmt.Printf("%v", str) - writer.WriteString(str) - } - - fmt.Println("End importing...") + ImportStreamerIntoEs() } diff --git a/cmd/streamer_into_es.go b/cmd/streamer_into_es.go new file mode 100644 index 0000000..435d4de --- /dev/null +++ b/cmd/streamer_into_es.go @@ -0,0 +1,95 @@ +package main + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/Leufolium/test/dbstruct" + "github.com/Leufolium/test/es" + "github.com/Leufolium/test/mongo" + "github.com/Leufolium/test/util" + "github.com/mozillazg/go-pinyin" +) + +func ImportStreamerIntoEs() { + fmt.Println("Start importing...") + + mcli, err := mongo.NewMongo() + if err != nil { + fmt.Printf("mongo client init fail : %v", err) + return + } + + escli, err := es.NewElasticSearch() + if err != nil { + fmt.Printf("elastic search client init fail : %v", err) + return + } + + ctx := context.Background() + + limit := 1000 + offset := 0 + + streamers, err := mcli.GetStreamerList(ctx, offset, limit) + if err != nil { + fmt.Printf("GetStreamerListfail : %v", err) + return + } + mids := make([]int64, 0) + for _, streamer := range streamers { + mids = append(mids, streamer.GetMid()) + } + accounts, _ := mcli.GetAccountListByMids(ctx, mids) + acctMp := make(map[int64]*dbstruct.Account) + for _, acct := range accounts { + acctMp[acct.GetMid()] = acct + } + zones, _ := mcli.GetZoneListByMids(ctx, mids) + zoneMp := make(map[int64]*dbstruct.Zone) + for _, zone := range zones { + zoneMp[zone.GetMid()] = zone + } + + streameraccts := make([]*dbstruct.EsStreamerAcct, 0) + for _, streamer := range streamers { + zone, ok := zoneMp[streamer.GetMid()] + lastZoneMomentCreateDay := int64(0) + if ok && zone.GetLastZoneMomentCt() != 0 { + lastZoneMomentCreateDay = util.GetDayStartTimeStamp(time.Unix(zone.GetLastZoneMomentCt(), 0)) + } + name := "" + py := "" + userIdString := "" + acct, ok := acctMp[streamer.GetMid()] + if ok { + name = acct.GetName() + py = strings.Join(pinyin.LazyConvert(acct.GetName(), nil), "") + userIdString = util.DerefString(acct.UserIdString) + } + streameraccts = append(streameraccts, &dbstruct.EsStreamerAcct{ + Mid: streamer.GetMid(), + Name: name, + UserIdString: userIdString, + PinYin: py, + Age: util.DerefInt64(streamer.Age), + Height: util.DerefInt64(streamer.Height), + Weight: util.DerefInt64(streamer.Weight), + City: util.DerefString(streamer.City), + Constellation: util.DerefString(streamer.Constellation), + LastZoneMomentCreateDayStart: lastZoneMomentCreateDay, + Ct: time.Now().Unix(), + Ut: time.Now().Unix(), + DelFlag: 0, + }) + } + err = escli.CreateStreamerAcct(ctx, streameraccts) + if err != nil { + fmt.Printf("CreateStreamerAcct : %v", err) + return + } + + fmt.Println("End importing...") +} diff --git a/dbstruct/auditcomponent.go b/dbstruct/auditcomponent.go new file mode 100644 index 0000000..04be4b4 --- /dev/null +++ b/dbstruct/auditcomponent.go @@ -0,0 +1,20 @@ +package dbstruct + +type AuditComponent struct { + AuditStatus *int64 `json:"audit_status" bson:"audit_status"` // 审核状态 + AuditOpinion *string `json:"audit_opinion" bson:"audit_opinion"` // 审核意见 +} + +func (p *AuditComponent) GetAuditStatus() int64 { + if p == nil || p.AuditStatus == nil { + return 0 + } + return *p.AuditStatus +} + +func (p *AuditComponent) GetAuditOpinion() string { + if p == nil || p.AuditOpinion == nil { + return "" + } + return *p.AuditOpinion +} diff --git a/dbstruct/streamer.go b/dbstruct/streamer.go new file mode 100644 index 0000000..7441cb7 --- /dev/null +++ b/dbstruct/streamer.go @@ -0,0 +1,59 @@ +package dbstruct + +type Streamer struct { + Id *int64 `json:"id" bson:"_id"` // 主播表id + Mid *int64 `json:"mid" bson:"mid"` // 用户表id + Gender *int64 `json:"gender" bson:"gender"` // 性别 + Bio *string `json:"bio" bson:"bio"` // 个性签名 + Cover *MediaComponent `json:"cover" bson:"cover"` // 封面 + Shorts *MediaComponent `json:"shorts" bson:"shorts"` // 展示视频 + Album *MediaComponent `json:"album" bson:"album"` // 相册 + Age *int64 `json:"age" bson:"age"` // 年龄 + Height *int64 `json:"height" bson:"height"` // 身高 + Weight *int64 `json:"weight" bson:"weight"` // 体重 + Constellation *string `json:"constellation" bson:"constellation"` // 星座 + City *string `json:"city" bson:"city"` // 所在城市 + Tag *[]string `json:"tag" bson:"tag"` // 主播标签 + Fans *int64 `json:"fans" bson:"fans"` // 全网粉丝 + AutoResponseMessage *string `json:"auto_response_message" bson:"auto_response_message"` // 自动回复消息 + Inviters *[]int64 `json:"inviters" bson:"inviters"` // 邀请人 + IsHided *int64 `json:"is_hided" bson:"is_hided"` // 是否隐藏 + Ct *int64 `json:"ct" bson:"ct"` // 创建时间 + Ut *int64 `json:"ut" bson:"ut"` // 更新时间 + DelFlag *int64 `json:"del_flag" bson:"del_flag"` // 删除标记 + CoverAudit *AuditComponent `bson:"cover_audit"` // 封面审核 + ShortsAudit *AuditComponent `bson:"shorts_audit"` // 展示视频审核 + AlbumAudit *AuditComponent `bson:"album_audit"` // 相册审核 + BioAudit *AuditComponent `bson:"bio_audit"` // 个性签名审核 + AutoResponseMessageAudit *AuditComponent `bson:"auto_response_message_audit"` // 自动回复消息审核 + CoverReview *AuditComponent `bson:"cover_review"` // 封面人审 + ShortsReview *AuditComponent `bson:"shorts_review"` // 展示视频人审 + AlbumReview *AuditComponent `bson:"album_review"` // 相册人审 + BioReview *AuditComponent `bson:"bio_review"` // 个性签名人审 + AutoResponseMessageReview *AuditComponent `bson:"auto_response_message_review"` // 自动回复消息人审 + + WechatLockType *int32 `json:"wechat_lock_type"` // 微信解锁方式 + WechatContact *string `json:"wechat_contact"` // 微信联系方式 + WechatCoinPrice *int64 `json:"wechat_coin_price"` // 微信金币价格 +} + +func (p *Streamer) GetMid() int64 { + if p == nil || p.Mid == nil { + return -1 + } + return *p.Mid +} + +func (p *Streamer) GetBio() string { + if p == nil || p.Bio == nil { + return "" + } + return *p.Bio +} + +func (p *Streamer) GetAutoResponseMessage() string { + if p == nil || p.AutoResponseMessage == nil { + return "" + } + return *p.AutoResponseMessage +} diff --git a/dbstruct/streamer_acct.go b/dbstruct/streamer_acct.go index 10a1bd9..7e5ce69 100644 --- a/dbstruct/streamer_acct.go +++ b/dbstruct/streamer_acct.go @@ -1,11 +1,17 @@ package dbstruct type EsStreamerAcct struct { - Mid int64 `json:"id"` // 用户表Id - Name string `json:"name"` // 用户名 - UserIdString string `json:"user_id_string"` // string型user_id,为模糊匹配设置 - PinYin string `json:"pinyin"` // 拼音 - Ct int64 `json:"ct"` // 创建时间 - Ut int64 `json:"ut"` // 更新时间 - DelFlag int64 `json:"del_flag"` // 删除标记,0-否,1-是 + Mid int64 `json:"id"` // 用户表Id + Name string `json:"name"` // 用户名 + UserIdString string `json:"user_id_string"` // string型user_id,为模糊匹配设置 + PinYin string `json:"pinyin"` // 拼音 + Age int64 `json:"age"` // 年龄 + Height int64 `json:"height"` // 身高 + Weight int64 `json:"weight"` // 体重 + City string `json:"city"` // 所在城市 + Constellation string `json:"constellation"` // 星座 + LastZoneMomentCreateDayStart int64 `json:"last_zone_moment_create_day_start"` // 最后空间动态创建日始整点 + Ct int64 `json:"ct"` // 创建时间 + Ut int64 `json:"ut"` // 更新时间 + DelFlag int64 `json:"del_flag"` // 删除标记,0-否,1-是 } diff --git a/mongo/mongo.go b/mongo/mongo.go index b4f42d7..6f6038c 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -49,6 +49,9 @@ const ( DBZoneMoment = "zone_moment" COLZoneMoment = "zone_moment" + + DBStreamer = "streamer" + COLStreamer = "streamer" ) func (m *Mongo) getColUserIdSeq() *qmgo.Collection { @@ -84,6 +87,11 @@ func (m *Mongo) getColZoneMoment() *qmgo.Collection { return m.clientMix.Database(DBZoneMoment).Collection(COLZoneMoment) } +// 主播表 +func (m *Mongo) getColStreamer() *qmgo.Collection { + return m.clientMix.Database(DBStreamer).Collection(COLStreamer) +} + func (m *Mongo) CreateBatch(ctx context.Context, userIdSeqs []*UserIdMap) error { col := m.getColUserIdMap() _, err := col.InsertMany(ctx, userIdSeqs) @@ -245,3 +253,18 @@ func (m *Mongo) GetZoneMomentCountByMid(ctx context.Context, mid int64) (int64, } return col.Find(ctx, query).Count() } + +func (m *Mongo) GetStreamerList(ctx context.Context, offset, limit int) ([]*dbstruct.Streamer, error) { + list := make([]*dbstruct.Streamer, 0) + col := m.getColStreamer() + + query := qmgo.M{ + "del_flag": 0, + } + err := col.Find(ctx, query).Sort("_id").Skip(int64(offset)).Limit(int64(limit)).All(&list) + if err == qmgo.ErrNoSuchDocuments { + err = nil + return list, err + } + return list, err +} diff --git a/util/ptrreader.go b/util/ptrreader.go new file mode 100644 index 0000000..2d3ec60 --- /dev/null +++ b/util/ptrreader.go @@ -0,0 +1,106 @@ +package util + +import "unsafe" + +type eface struct { + v int64 + ptr unsafe.Pointer +} + +const ( + DefaultBool bool = false + DefaultString string = "" + DefaultInt int = 0 + DefaultInt32 int32 = 0 + DefaultInt64 int64 = 0 + DefaultFloat32 float32 = 0 + DefaultFloat64 float64 = 0 +) + +func DerefBool(p *bool) bool { + if p != nil { + return *p + } + return DefaultBool +} + +func DerefString(p *string) string { + if p != nil { + return *p + } + return DefaultString +} + +func DerefInt(p *int) int { + if p != nil { + return *p + } + return DefaultInt +} + +func DerefInt32(p *int32) int32 { + if p != nil { + return *p + } + return DefaultInt32 +} + +func DerefInt64(p *int64) int64 { + if p != nil { + return *p + } + return DefaultInt64 +} + +func DerefFloat32(p *float32) float32 { + if p != nil { + return *p + } + return DefaultFloat32 +} + +func DerefFloat64(p *float64) float64 { + if p != nil { + return *p + } + return DefaultFloat64 +} + +func Int64Slice(v []int64) *[]int64 { + return &v +} + +func DerefInt64Slice(p *[]int64) []int64 { + if p != nil { + return *p + } + return make([]int64, 0) +} + +func DerefStringSlice(p *[]string) []string { + if p != nil { + return *p + } + return make([]string, 0) +} + +// 判断转为any的指针是否为空指针 +func IsElemNil(elem any) bool { + efaceptr := (*eface)(unsafe.Pointer(&elem)) + if efaceptr == nil || uintptr(efaceptr.ptr) == 0x0 { + return true + } + return false +} + +// 判断转为any的切片是否为空切片 +func IsSliceNil(elem any) bool { + efaceptr := (*eface)(unsafe.Pointer(&elem)) + if efaceptr == nil || uintptr(efaceptr.ptr) == 0x0 { + return true + } + + var sliceInfo = *(*[3]int)(efaceptr.ptr) + + return sliceInfo[1] == 0 +} diff --git a/util/util.go b/util/util.go new file mode 100644 index 0000000..1c11fbd --- /dev/null +++ b/util/util.go @@ -0,0 +1,20 @@ +package util + +import ( + "fmt" + "time" +) + +// 获取0点时间戳 +func GetDayStartTimeStamp(t time.Time) int64 { + loc, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + loc = time.FixedZone("CST", 8*3600) + } + timeStr := fmt.Sprintf("%02d-%02d-%02d 00:00:00", t.Year(), t.Month(), t.Day()) + duetimecst, err := time.ParseInLocation("2006-1-2 15:04:05", timeStr, loc) + if err != nil { + fmt.Printf("parse error : %v\n", err) + } + return duetimecst.Unix() +}