This commit is contained in:
Leufolium 2024-09-23 15:15:44 +08:00
parent f373ab76ee
commit da3026fae5
4 changed files with 87 additions and 31 deletions

View File

@ -2,10 +2,8 @@ package main
import ( import (
"data_prep/importfunc" "data_prep/importfunc"
"fmt"
) )
func main() { func main() {
fmt.Println("Importing")
importfunc.ImportUserIdMap() importfunc.ImportUserIdMap()
} }

View File

@ -8,14 +8,6 @@ import (
) )
func GenerateUserIdMap() { func GenerateUserIdMap() {
outfile, err := os.OpenFile("/Users/Caeru/dataprep/file/user_id_map.txt", os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
fmt.Printf("Open File Err : %v", err)
}
defer outfile.Close()
writer := bufio.NewWriter(outfile)
list := make([]int64, 0) list := make([]int64, 0)
for i := int64(1000000); i < 100000000; i++ { for i := int64(1000000); i < 100000000; i++ {
@ -25,9 +17,33 @@ func GenerateUserIdMap() {
list[i], list[j] = list[j], list[i] list[i], list[j] = list[j], list[i]
}) })
for i := range list { j := 1
outfile.WriteString(fmt.Sprintf("%v:%v\n", 1000000+i, list[i])) for i := 0; i < len(list); i += 1000000 {
lower := i
upper := i + 999999
if upper >= len(list) {
upper = len(list) - 1
}
WriteAsList(list[lower:upper], j)
j++
} }
writer.Flush() }
func WriteAsList(list []int64, seq int) {
pathurl := fmt.Sprintf("/Users/Caeru/Desktop/file/user_id_map%v.txt", seq)
outfile, err := os.OpenFile(pathurl, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
fmt.Printf("Open File Err : %v", err)
}
defer outfile.Close()
writer := bufio.NewWriter(outfile)
for i := range list {
outfile.WriteString(fmt.Sprintf("%v:%v\n", 1000000*seq+i, list[i]))
}
writer.Flush()
fmt.Printf("%dth file has generated\n", seq)
} }

View File

@ -1,9 +1,13 @@
package importfunc package importfunc
import ( import (
"bufio"
"context" "context"
"fmt" "fmt"
"math/rand" "io"
"os"
"strconv"
"strings"
"data_prep/dbstruct" "data_prep/dbstruct"
"data_prep/mongo" "data_prep/mongo"
@ -18,23 +22,49 @@ func ImportUserIdMap() {
} }
ctx := context.Background() ctx := context.Background()
// 打乱 userIdSeq, err := mcli.GetUserIdImportSeq(ctx)
list := make([]int64, 0) if err != nil {
for i := int64(1000000); i < 100000000; i++ { fmt.Printf("GetUserIdImportSeq fail : %v", err)
list = append(list, i) return
} }
rand.Shuffle(99000000, func(i, j int) {
list[i], list[j] = list[j], list[i]
})
for i := 0; i < len(list); i += 1000000 { pathurl := fmt.Sprintf("/app/data_prep/file/user_id_map%v.txt", userIdSeq.Seq)
lower := i
upper := i + 999999 fmt.Printf("Importing %dth", userIdSeq.Seq)
if upper >= len(list) { infile, err := os.Open(pathurl)
upper = len(list) - 1 if err != nil {
} fmt.Printf("Open File Err : %v", err)
ImportAsList(mcli, ctx, list[lower:upper], i)
} }
reader := bufio.NewReader(infile)
userIdMaps := make([]*dbstruct.UserIdMap, 0)
for {
str, err := reader.ReadString('\n')
if len(str) == 0 {
continue
}
strs := strings.Split(str, ":")
seq, _ := strconv.Atoi(strs[0])
userId, _ := strconv.Atoi(strs[1])
userIdMaps = append(userIdMaps, &dbstruct.UserIdMap{
Seq: int64(seq),
UserId: int64(userId),
})
if len(userIdMaps) == 1000 {
err := mcli.CreateMappedUserIds(ctx, userIdMaps)
if err != nil {
fmt.Printf("CreateMappedUserIds err :%v", err)
}
}
if err == io.EOF {
err := mcli.CreateMappedUserIds(ctx, userIdMaps)
if err != nil {
fmt.Printf("CreateMappedUserIds err :%v", err)
}
break
}
}
infile.Close()
fmt.Printf("%dth imported", userIdSeq.Seq)
fmt.Printf("Import into test success") fmt.Printf("Import into test success")
} }

View File

@ -32,9 +32,10 @@ func NewMongo() (mongo *Mongo, err error) {
} }
const ( const (
DBUserIdSeq = "user_id_seq" DBUserIdSeq = "user_id_seq"
COLUserIdSeq = "user_id_seq" COLUserIdSeq = "user_id_seq"
COLUserIdMap = "user_id_map" COLUserIdMap = "user_id_map"
COLUserIdImportSeq = "user_id_import_seq"
DBAccountIdSeq = "account_id_seq" DBAccountIdSeq = "account_id_seq"
COLAccountIdSeq = "account_id_seq" COLAccountIdSeq = "account_id_seq"
@ -74,6 +75,10 @@ func (m *Mongo) getColUserIdMap() *qmgo.Collection {
return m.clientMix.Database(DBUserIdSeq).Collection(COLUserIdMap) return m.clientMix.Database(DBUserIdSeq).Collection(COLUserIdMap)
} }
func (m *Mongo) getColUserIdImportSeq() *qmgo.Collection {
return m.clientMix.Database(DBUserIdSeq).Collection(COLUserIdImportSeq)
}
// Account表 // Account表
func (m *Mongo) getColAccount() *qmgo.Collection { func (m *Mongo) getColAccount() *qmgo.Collection {
return m.clientMix.Database(DBAccount).Collection(COLAccount) return m.clientMix.Database(DBAccount).Collection(COLAccount)
@ -336,3 +341,10 @@ func (m *Mongo) GetZoneVasByIds(ctx context.Context, zids []int64) ([]*dbstruct.
} }
return list, nil return list, nil
} }
func (m *Mongo) GetUserIdImportSeq(ctx context.Context) (*dbstruct.UserIdSeq, error) {
col := m.getColUserIdImportSeq()
userIdSeqInstance := dbstruct.UserIdSeq{}
err := col.Find(ctx, qmgo.M{"_id": "seq_id"}).One(&userIdSeqInstance)
return &userIdSeqInstance, err
}