This commit is contained in:
Leufolium 2024-08-15 04:56:58 +08:00
parent b179e3fb5d
commit b63febdd66
8 changed files with 337 additions and 714 deletions

View File

@ -1,711 +1,5 @@
package main package main
import (
"bufio"
"context"
"encoding/base64"
"fmt"
"os"
myCrypto "github.com/Leufolium/test/crypto"
"github.com/Leufolium/test/dbstruct"
"github.com/Leufolium/test/mongo"
"github.com/Leufolium/test/mysql"
)
func main() { func main() {
fmt.Println("Start importing...") ImportStreamerIntoEs()
cryptoService := new(myCrypto.CryptoService)
cryptoService.Init()
client, err := mongo.NewMongo()
if err != nil {
fmt.Printf("mongo client init fail : %v", err)
return
}
mysqlcli, err := mysql.NewMysql()
if err != nil {
fmt.Printf("mysql client init fail : %v", err)
return
}
ctx := context.Background()
mids := []int64{
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
14,
15,
16,
17,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31,
34,
35,
36,
37,
38,
44,
48,
50,
58,
61,
62,
63,
64,
65,
66,
67,
68,
69,
71,
72,
73,
74,
75,
76,
77,
78,
79,
81,
82,
83,
85,
87,
88,
89,
91,
92,
93,
94,
95,
96,
97,
98,
99,
100,
101,
102,
103,
104,
105,
106,
107,
109,
110,
111,
112,
113,
115,
116,
117,
118,
119,
120,
121,
122,
123,
124,
125,
127,
131,
132,
133,
134,
135,
136,
137,
138,
139,
140,
141,
142,
144,
145,
146,
147,
148,
149,
150,
151,
159,
245,
283,
447,
493,
501,
507,
750,
779,
876,
897,
898,
996,
1071,
1188,
1258,
1259,
1336,
1365,
1471,
1624,
1649,
1740,
1832,
1850,
1958,
1982,
2391,
2432,
2508,
2608,
2634,
2686,
2895,
2953,
2997,
3169,
3343,
3364,
3367,
3489,
3536,
3940,
4474,
4497,
4865,
5051,
5161,
5545,
5620,
5681,
6069,
6145,
6520,
6856,
7520,
7875,
8184,
9429,
10351,
11146,
11583,
12771,
12955,
13208,
13324,
13820,
13862,
15851,
15949,
15966,
16311,
18370,
18488,
19230,
19316,
20475,
21951,
22358,
23152,
24457,
24582,
25058,
25114,
25315,
26283,
29980,
31476,
31551,
33487,
34365,
35042,
35373,
36032,
36540,
37567,
37634,
37939,
38971,
39190,
40441,
42720,
46302,
48900,
49436,
51444,
51609,
52499,
54525,
55108,
57245,
57272,
60553,
60978,
61060,
65028,
66771,
66930,
66961,
67378,
67379,
67383,
67420,
68283,
68559,
69379,
70822,
71287,
76278,
76723,
77377,
79390,
79843,
80077,
82391,
82414,
84663,
85910,
87404,
87907,
89818,
90281,
90356,
91236,
91856,
93963,
97359,
97857,
99796,
100601,
103920,
105604,
143961,
144384,
145309,
145740,
145806,
146166,
146340,
146460,
146545,
147322,
148509,
152001,
153632,
158395,
159566,
161609,
162710,
162816,
163374,
163608,
163660,
164533,
164630,
166037,
167340,
169095,
172477,
172731,
173051,
173304,
174440,
175834,
176342,
177637,
178210,
179782,
180889,
182308,
184293,
185893,
187760,
190618,
191324,
192434,
192549,
193781,
194448,
195763,
197811,
199188,
202826,
204372,
205940,
206164,
206580,
208778,
209473,
210852,
212561,
213014,
213554,
214911,
215008,
215135,
215240,
217606,
217860,
218851,
220411,
226924,
227196,
228265,
228359,
229743,
230965,
239368,
240225,
242263,
243225,
243875,
249042,
252780,
254325,
254587,
258846,
259111,
259476,
263758,
263921,
264877,
267713,
267884,
268493,
268954,
269339,
269583,
269632,
270017,
270232,
270602,
270888,
273575,
274697,
275013,
275732,
276039,
276071,
276120,
276466,
279590,
280318,
280544,
293390,
293753,
296457,
296564,
300771,
301759,
302200,
308732,
308873,
310356,
311294,
312807,
317098,
317333,
320610,
322645,
323517,
326483,
327391,
328717,
329947,
330130,
330205,
330649,
335318,
335373,
335577,
335736,
337587,
337626,
337829,
338334,
338605,
341103,
341351,
341834,
342076,
342468,
342969,
344012,
344664,
344844,
345300,
345521,
345859,
346732,
347900,
348187,
348500,
348616,
348991,
349805,
351513,
352007,
352353,
352787,
352921,
352924,
353051,
353654,
353681,
353921,
354058,
354125,
354236,
355229,
356071,
356408,
356641,
356737,
357096,
357146,
357181,
357669,
358886,
359048,
359406,
360080,
360323,
360412,
361340,
365051,
366194,
367375,
367849,
370070,
370793,
371145,
371600,
373789,
374749,
376093,
376243,
377767,
378027,
378518,
378891,
379005,
379253,
380316,
380647,
380765,
381577,
382101,
382874,
383225,
386910,
387243,
388957,
389643,
389928,
390102,
391886,
392278,
392896,
393152,
393298,
393529,
396777,
397554,
399206,
399212,
399876,
401922,
402096,
404583,
404820,
406241,
406445,
408607,
408708,
408893,
409130,
409145,
409201,
410700,
412300,
412533,
412764,
412769,
414419,
414568,
414570,
414996,
416251,
418307,
420413,
421224,
421229,
421408,
421498,
421672,
421876,
421951,
422423,
424452,
425002,
425180,
425534,
425674,
426274,
426738,
427317,
429542,
429897,
430335,
430734,
431977,
432362,
432447,
432893,
432959,
435296,
436833,
436883,
437166,
437347,
437450,
437526,
437934,
437955,
438312,
438643,
439926,
440514,
440977,
441008,
441744,
441793,
441883,
443406,
445007,
445068,
445162,
445215,
445553,
445716,
445860,
446143,
446204,
446887,
447665,
447925,
449525,
449933,
450124,
450203,
450388,
450488,
450739,
450832,
450834,
451329,
451333,
451444,
451513,
451771,
454716,
454721,
455290,
455709,
456325,
456385,
456449,
456473,
456921,
457540,
458707,
459369,
459482,
459716,
459920,
460252,
460716,
}
accounts, err := client.GetAccountListByMids(ctx, mids)
if err != nil {
fmt.Printf("GetAccountListByMids fail, err :%v\n", err)
}
zones, err := client.GetZoneListByMids(ctx, mids)
if err != nil {
fmt.Printf("GetZoneListByMids fail, err :%v\n", err)
}
outfilePath := "/app/dataprep/outfile.txt"
outfile, err := os.OpenFile(outfilePath, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
fmt.Printf("Open File Err : %v", err)
}
defer outfile.Close()
writer := bufio.NewWriter(outfile)
zoneMp := make(map[int64]*dbstruct.Zone, 0)
zids := make([]int64, 0)
for _, zone := range zones {
zoneMp[zone.GetMid()] = zone
zids = append(zids, zone.GetId())
}
zcs, err := mysqlcli.GetZoneMemberCountGroupByZid(ctx, nil, zids)
if err != nil {
fmt.Printf("GetZoneMemberCountGroupByZid Err : %v", err)
}
zcMp := make(map[int64]int64, 0)
for _, zc := range zcs {
zcMp[zc.GetZid()] = zc.GetNum()
}
for _, account := range accounts {
base64DecryptedBytes, _ := base64.StdEncoding.DecodeString(*account.MobilePhone)
phone, err := cryptoService.DecryptByAES(base64DecryptedBytes)
if err != nil {
fmt.Printf("DecryptByAES err :%v\n", err)
}
phoneStr := string(phone)
mid := account.GetMid()
name := account.GetName()
if name == "账号已注销" {
continue
}
userId := account.GetUserId()
count := int64(0)
membercount := int64(0)
zone, ok := zoneMp[mid]
if ok {
count = zone.GetZoneMomentCount()
membercount = zcMp[zone.GetId()]
}
str := fmt.Sprintf("%v\t%v\t%v\t%v\t%v\t%v\n", mid, name, userId, phoneStr, count, membercount)
fmt.Printf("%v", str)
writer.WriteString(str)
}
fmt.Println("End importing...")
} }

95
cmd/streamer_into_es.go Normal file
View File

@ -0,0 +1,95 @@
package main
import (
"context"
"fmt"
"strings"
"time"
"github.com/Leufolium/test/dbstruct"
"github.com/Leufolium/test/es"
"github.com/Leufolium/test/mongo"
"github.com/Leufolium/test/util"
"github.com/mozillazg/go-pinyin"
)
func ImportStreamerIntoEs() {
fmt.Println("Start importing...")
mcli, err := mongo.NewMongo()
if err != nil {
fmt.Printf("mongo client init fail : %v", err)
return
}
escli, err := es.NewElasticSearch()
if err != nil {
fmt.Printf("elastic search client init fail : %v", err)
return
}
ctx := context.Background()
limit := 1000
offset := 0
streamers, err := mcli.GetStreamerList(ctx, offset, limit)
if err != nil {
fmt.Printf("GetStreamerListfail : %v", err)
return
}
mids := make([]int64, 0)
for _, streamer := range streamers {
mids = append(mids, streamer.GetMid())
}
accounts, _ := mcli.GetAccountListByMids(ctx, mids)
acctMp := make(map[int64]*dbstruct.Account)
for _, acct := range accounts {
acctMp[acct.GetMid()] = acct
}
zones, _ := mcli.GetZoneListByMids(ctx, mids)
zoneMp := make(map[int64]*dbstruct.Zone)
for _, zone := range zones {
zoneMp[zone.GetMid()] = zone
}
streameraccts := make([]*dbstruct.EsStreamerAcct, 0)
for _, streamer := range streamers {
zone, ok := zoneMp[streamer.GetMid()]
lastZoneMomentCreateDay := int64(0)
if ok && zone.GetLastZoneMomentCt() != 0 {
lastZoneMomentCreateDay = util.GetDayStartTimeStamp(time.Unix(zone.GetLastZoneMomentCt(), 0))
}
name := ""
py := ""
userIdString := ""
acct, ok := acctMp[streamer.GetMid()]
if ok {
name = acct.GetName()
py = strings.Join(pinyin.LazyConvert(acct.GetName(), nil), "")
userIdString = util.DerefString(acct.UserIdString)
}
streameraccts = append(streameraccts, &dbstruct.EsStreamerAcct{
Mid: streamer.GetMid(),
Name: name,
UserIdString: userIdString,
PinYin: py,
Age: util.DerefInt64(streamer.Age),
Height: util.DerefInt64(streamer.Height),
Weight: util.DerefInt64(streamer.Weight),
City: util.DerefString(streamer.City),
Constellation: util.DerefString(streamer.Constellation),
LastZoneMomentCreateDayStart: lastZoneMomentCreateDay,
Ct: time.Now().Unix(),
Ut: time.Now().Unix(),
DelFlag: 0,
})
}
err = escli.CreateStreamerAcct(ctx, streameraccts)
if err != nil {
fmt.Printf("CreateStreamerAcct : %v", err)
return
}
fmt.Println("End importing...")
}

View File

@ -0,0 +1,20 @@
package dbstruct
type AuditComponent struct {
AuditStatus *int64 `json:"audit_status" bson:"audit_status"` // 审核状态
AuditOpinion *string `json:"audit_opinion" bson:"audit_opinion"` // 审核意见
}
func (p *AuditComponent) GetAuditStatus() int64 {
if p == nil || p.AuditStatus == nil {
return 0
}
return *p.AuditStatus
}
func (p *AuditComponent) GetAuditOpinion() string {
if p == nil || p.AuditOpinion == nil {
return ""
}
return *p.AuditOpinion
}

59
dbstruct/streamer.go Normal file
View File

@ -0,0 +1,59 @@
package dbstruct
type Streamer struct {
Id *int64 `json:"id" bson:"_id"` // 主播表id
Mid *int64 `json:"mid" bson:"mid"` // 用户表id
Gender *int64 `json:"gender" bson:"gender"` // 性别
Bio *string `json:"bio" bson:"bio"` // 个性签名
Cover *MediaComponent `json:"cover" bson:"cover"` // 封面
Shorts *MediaComponent `json:"shorts" bson:"shorts"` // 展示视频
Album *MediaComponent `json:"album" bson:"album"` // 相册
Age *int64 `json:"age" bson:"age"` // 年龄
Height *int64 `json:"height" bson:"height"` // 身高
Weight *int64 `json:"weight" bson:"weight"` // 体重
Constellation *string `json:"constellation" bson:"constellation"` // 星座
City *string `json:"city" bson:"city"` // 所在城市
Tag *[]string `json:"tag" bson:"tag"` // 主播标签
Fans *int64 `json:"fans" bson:"fans"` // 全网粉丝
AutoResponseMessage *string `json:"auto_response_message" bson:"auto_response_message"` // 自动回复消息
Inviters *[]int64 `json:"inviters" bson:"inviters"` // 邀请人
IsHided *int64 `json:"is_hided" bson:"is_hided"` // 是否隐藏
Ct *int64 `json:"ct" bson:"ct"` // 创建时间
Ut *int64 `json:"ut" bson:"ut"` // 更新时间
DelFlag *int64 `json:"del_flag" bson:"del_flag"` // 删除标记
CoverAudit *AuditComponent `bson:"cover_audit"` // 封面审核
ShortsAudit *AuditComponent `bson:"shorts_audit"` // 展示视频审核
AlbumAudit *AuditComponent `bson:"album_audit"` // 相册审核
BioAudit *AuditComponent `bson:"bio_audit"` // 个性签名审核
AutoResponseMessageAudit *AuditComponent `bson:"auto_response_message_audit"` // 自动回复消息审核
CoverReview *AuditComponent `bson:"cover_review"` // 封面人审
ShortsReview *AuditComponent `bson:"shorts_review"` // 展示视频人审
AlbumReview *AuditComponent `bson:"album_review"` // 相册人审
BioReview *AuditComponent `bson:"bio_review"` // 个性签名人审
AutoResponseMessageReview *AuditComponent `bson:"auto_response_message_review"` // 自动回复消息人审
WechatLockType *int32 `json:"wechat_lock_type"` // 微信解锁方式
WechatContact *string `json:"wechat_contact"` // 微信联系方式
WechatCoinPrice *int64 `json:"wechat_coin_price"` // 微信金币价格
}
func (p *Streamer) GetMid() int64 {
if p == nil || p.Mid == nil {
return -1
}
return *p.Mid
}
func (p *Streamer) GetBio() string {
if p == nil || p.Bio == nil {
return ""
}
return *p.Bio
}
func (p *Streamer) GetAutoResponseMessage() string {
if p == nil || p.AutoResponseMessage == nil {
return ""
}
return *p.AutoResponseMessage
}

View File

@ -1,11 +1,17 @@
package dbstruct package dbstruct
type EsStreamerAcct struct { type EsStreamerAcct struct {
Mid int64 `json:"id"` // 用户表Id Mid int64 `json:"id"` // 用户表Id
Name string `json:"name"` // 用户名 Name string `json:"name"` // 用户名
UserIdString string `json:"user_id_string"` // string型user_id为模糊匹配设置 UserIdString string `json:"user_id_string"` // string型user_id为模糊匹配设置
PinYin string `json:"pinyin"` // 拼音 PinYin string `json:"pinyin"` // 拼音
Ct int64 `json:"ct"` // 创建时间 Age int64 `json:"age"` // 年龄
Ut int64 `json:"ut"` // 更新时间 Height int64 `json:"height"` // 身高
DelFlag int64 `json:"del_flag"` // 删除标记0-否1-是 Weight int64 `json:"weight"` // 体重
City string `json:"city"` // 所在城市
Constellation string `json:"constellation"` // 星座
LastZoneMomentCreateDayStart int64 `json:"last_zone_moment_create_day_start"` // 最后空间动态创建日始整点
Ct int64 `json:"ct"` // 创建时间
Ut int64 `json:"ut"` // 更新时间
DelFlag int64 `json:"del_flag"` // 删除标记0-否1-是
} }

View File

@ -49,6 +49,9 @@ const (
DBZoneMoment = "zone_moment" DBZoneMoment = "zone_moment"
COLZoneMoment = "zone_moment" COLZoneMoment = "zone_moment"
DBStreamer = "streamer"
COLStreamer = "streamer"
) )
func (m *Mongo) getColUserIdSeq() *qmgo.Collection { func (m *Mongo) getColUserIdSeq() *qmgo.Collection {
@ -84,6 +87,11 @@ func (m *Mongo) getColZoneMoment() *qmgo.Collection {
return m.clientMix.Database(DBZoneMoment).Collection(COLZoneMoment) return m.clientMix.Database(DBZoneMoment).Collection(COLZoneMoment)
} }
// 主播表
func (m *Mongo) getColStreamer() *qmgo.Collection {
return m.clientMix.Database(DBStreamer).Collection(COLStreamer)
}
func (m *Mongo) CreateBatch(ctx context.Context, userIdSeqs []*UserIdMap) error { func (m *Mongo) CreateBatch(ctx context.Context, userIdSeqs []*UserIdMap) error {
col := m.getColUserIdMap() col := m.getColUserIdMap()
_, err := col.InsertMany(ctx, userIdSeqs) _, err := col.InsertMany(ctx, userIdSeqs)
@ -245,3 +253,18 @@ func (m *Mongo) GetZoneMomentCountByMid(ctx context.Context, mid int64) (int64,
} }
return col.Find(ctx, query).Count() return col.Find(ctx, query).Count()
} }
func (m *Mongo) GetStreamerList(ctx context.Context, offset, limit int) ([]*dbstruct.Streamer, error) {
list := make([]*dbstruct.Streamer, 0)
col := m.getColStreamer()
query := qmgo.M{
"del_flag": 0,
}
err := col.Find(ctx, query).Sort("_id").Skip(int64(offset)).Limit(int64(limit)).All(&list)
if err == qmgo.ErrNoSuchDocuments {
err = nil
return list, err
}
return list, err
}

106
util/ptrreader.go Normal file
View File

@ -0,0 +1,106 @@
package util
import "unsafe"
type eface struct {
v int64
ptr unsafe.Pointer
}
const (
DefaultBool bool = false
DefaultString string = ""
DefaultInt int = 0
DefaultInt32 int32 = 0
DefaultInt64 int64 = 0
DefaultFloat32 float32 = 0
DefaultFloat64 float64 = 0
)
func DerefBool(p *bool) bool {
if p != nil {
return *p
}
return DefaultBool
}
func DerefString(p *string) string {
if p != nil {
return *p
}
return DefaultString
}
func DerefInt(p *int) int {
if p != nil {
return *p
}
return DefaultInt
}
func DerefInt32(p *int32) int32 {
if p != nil {
return *p
}
return DefaultInt32
}
func DerefInt64(p *int64) int64 {
if p != nil {
return *p
}
return DefaultInt64
}
func DerefFloat32(p *float32) float32 {
if p != nil {
return *p
}
return DefaultFloat32
}
func DerefFloat64(p *float64) float64 {
if p != nil {
return *p
}
return DefaultFloat64
}
func Int64Slice(v []int64) *[]int64 {
return &v
}
func DerefInt64Slice(p *[]int64) []int64 {
if p != nil {
return *p
}
return make([]int64, 0)
}
func DerefStringSlice(p *[]string) []string {
if p != nil {
return *p
}
return make([]string, 0)
}
// 判断转为any的指针是否为空指针
func IsElemNil(elem any) bool {
efaceptr := (*eface)(unsafe.Pointer(&elem))
if efaceptr == nil || uintptr(efaceptr.ptr) == 0x0 {
return true
}
return false
}
// 判断转为any的切片是否为空切片
func IsSliceNil(elem any) bool {
efaceptr := (*eface)(unsafe.Pointer(&elem))
if efaceptr == nil || uintptr(efaceptr.ptr) == 0x0 {
return true
}
var sliceInfo = *(*[3]int)(efaceptr.ptr)
return sliceInfo[1] == 0
}

20
util/util.go Normal file
View File

@ -0,0 +1,20 @@
package util
import (
"fmt"
"time"
)
// 获取0点时间戳
func GetDayStartTimeStamp(t time.Time) int64 {
loc, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
loc = time.FixedZone("CST", 8*3600)
}
timeStr := fmt.Sprintf("%02d-%02d-%02d 00:00:00", t.Year(), t.Month(), t.Day())
duetimecst, err := time.ParseInLocation("2006-1-2 15:04:05", timeStr, loc)
if err != nil {
fmt.Printf("parse error : %v\n", err)
}
return duetimecst.Unix()
}