diff --git a/cmd/main.go b/cmd/main.go index 830dd60..708498a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,10 +2,8 @@ package main import ( "data_prep/importfunc" - "fmt" ) func main() { - fmt.Println("Importing") importfunc.ImportUserIdMap() } diff --git a/importfunc/generate_user_id_map.go b/importfunc/generate_user_id_map.go index 9587e36..6256efc 100644 --- a/importfunc/generate_user_id_map.go +++ b/importfunc/generate_user_id_map.go @@ -8,14 +8,6 @@ import ( ) func GenerateUserIdMap() { - outfile, err := os.OpenFile("/Users/Caeru/dataprep/file/user_id_map.txt", os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - fmt.Printf("Open File Err : %v", err) - } - - defer outfile.Close() - - writer := bufio.NewWriter(outfile) list := make([]int64, 0) for i := int64(1000000); i < 100000000; i++ { @@ -25,9 +17,33 @@ func GenerateUserIdMap() { list[i], list[j] = list[j], list[i] }) - for i := range list { - outfile.WriteString(fmt.Sprintf("%v:%v\n", 1000000+i, list[i])) + j := 1 + for i := 0; i < len(list); i += 1000000 { + lower := i + upper := i + 999999 + if upper >= len(list) { + upper = len(list) - 1 + } + WriteAsList(list[lower:upper], j) + j++ } - writer.Flush() +} + +func WriteAsList(list []int64, seq int) { + pathurl := fmt.Sprintf("/Users/Caeru/Desktop/file/user_id_map%v.txt", seq) + outfile, err := os.OpenFile(pathurl, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + fmt.Printf("Open File Err : %v", err) + } + + defer outfile.Close() + + writer := bufio.NewWriter(outfile) + + for i := range list { + outfile.WriteString(fmt.Sprintf("%v:%v\n", 1000000*seq+i, list[i])) + } + writer.Flush() + fmt.Printf("%dth file has generated\n", seq) } diff --git a/importfunc/import_user_id_map.go b/importfunc/import_user_id_map.go index ff8f2eb..a590dff 100644 --- a/importfunc/import_user_id_map.go +++ b/importfunc/import_user_id_map.go @@ -1,9 +1,13 @@ package importfunc import ( + "bufio" "context" "fmt" - "math/rand" + "io" + "os" + "strconv" + "strings" "data_prep/dbstruct" "data_prep/mongo" @@ -18,23 +22,49 @@ func ImportUserIdMap() { } ctx := context.Background() - // 打乱 - list := make([]int64, 0) - for i := int64(1000000); i < 100000000; i++ { - list = append(list, i) + userIdSeq, err := mcli.GetUserIdImportSeq(ctx) + if err != nil { + fmt.Printf("GetUserIdImportSeq fail : %v", err) + return } - rand.Shuffle(99000000, func(i, j int) { - list[i], list[j] = list[j], list[i] - }) - for i := 0; i < len(list); i += 1000000 { - lower := i - upper := i + 999999 - if upper >= len(list) { - upper = len(list) - 1 - } - ImportAsList(mcli, ctx, list[lower:upper], i) + pathurl := fmt.Sprintf("/app/data_prep/file/user_id_map%v.txt", userIdSeq.Seq) + + fmt.Printf("Importing %dth", userIdSeq.Seq) + infile, err := os.Open(pathurl) + if err != nil { + fmt.Printf("Open File Err : %v", err) } + reader := bufio.NewReader(infile) + userIdMaps := make([]*dbstruct.UserIdMap, 0) + for { + str, err := reader.ReadString('\n') + if len(str) == 0 { + continue + } + strs := strings.Split(str, ":") + seq, _ := strconv.Atoi(strs[0]) + userId, _ := strconv.Atoi(strs[1]) + userIdMaps = append(userIdMaps, &dbstruct.UserIdMap{ + Seq: int64(seq), + UserId: int64(userId), + }) + if len(userIdMaps) == 1000 { + err := mcli.CreateMappedUserIds(ctx, userIdMaps) + if err != nil { + fmt.Printf("CreateMappedUserIds err :%v", err) + } + } + if err == io.EOF { + err := mcli.CreateMappedUserIds(ctx, userIdMaps) + if err != nil { + fmt.Printf("CreateMappedUserIds err :%v", err) + } + break + } + } + infile.Close() + fmt.Printf("%dth imported", userIdSeq.Seq) fmt.Printf("Import into test success") } diff --git a/mongo/mongo.go b/mongo/mongo.go index 81f1246..fc1382b 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -32,9 +32,10 @@ func NewMongo() (mongo *Mongo, err error) { } const ( - DBUserIdSeq = "user_id_seq" - COLUserIdSeq = "user_id_seq" - COLUserIdMap = "user_id_map" + DBUserIdSeq = "user_id_seq" + COLUserIdSeq = "user_id_seq" + COLUserIdMap = "user_id_map" + COLUserIdImportSeq = "user_id_import_seq" DBAccountIdSeq = "account_id_seq" COLAccountIdSeq = "account_id_seq" @@ -74,6 +75,10 @@ func (m *Mongo) getColUserIdMap() *qmgo.Collection { return m.clientMix.Database(DBUserIdSeq).Collection(COLUserIdMap) } +func (m *Mongo) getColUserIdImportSeq() *qmgo.Collection { + return m.clientMix.Database(DBUserIdSeq).Collection(COLUserIdImportSeq) +} + // Account表 func (m *Mongo) getColAccount() *qmgo.Collection { return m.clientMix.Database(DBAccount).Collection(COLAccount) @@ -336,3 +341,10 @@ func (m *Mongo) GetZoneVasByIds(ctx context.Context, zids []int64) ([]*dbstruct. } return list, nil } + +func (m *Mongo) GetUserIdImportSeq(ctx context.Context) (*dbstruct.UserIdSeq, error) { + col := m.getColUserIdImportSeq() + userIdSeqInstance := dbstruct.UserIdSeq{} + err := col.Find(ctx, qmgo.M{"_id": "seq_id"}).One(&userIdSeqInstance) + return &userIdSeqInstance, err +}