371 lines
12 KiB
Go
371 lines
12 KiB
Go
|
/*
|
|||
|
Copyright 2020 The Qmgo Authors.
|
|||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|||
|
you may not use this file except in compliance with the License.
|
|||
|
You may obtain a copy of the License at
|
|||
|
http://www.apache.org/licenses/LICENSE-2.0
|
|||
|
Unless required by applicable law or agreed to in writing, software
|
|||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
|
See the License for the specific language governing permissions and
|
|||
|
limitations under the License.
|
|||
|
*/
|
|||
|
|
|||
|
package qmgo
|
|||
|
|
|||
|
import (
|
|||
|
"context"
|
|||
|
"fmt"
|
|||
|
"net/url"
|
|||
|
"strings"
|
|||
|
"time"
|
|||
|
|
|||
|
"github.com/qiniu/qmgo/options"
|
|||
|
"go.mongodb.org/mongo-driver/bson"
|
|||
|
"go.mongodb.org/mongo-driver/bson/bsoncodec"
|
|||
|
"go.mongodb.org/mongo-driver/mongo"
|
|||
|
officialOpts "go.mongodb.org/mongo-driver/mongo/options"
|
|||
|
"go.mongodb.org/mongo-driver/mongo/readpref"
|
|||
|
)
|
|||
|
|
|||
|
// Config for initial mongodb instance
|
|||
|
type Config struct {
|
|||
|
// URI example: [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options]
|
|||
|
// URI Reference: https://docs.mongodb.com/manual/reference/connection-string/
|
|||
|
Uri string `json:"uri"`
|
|||
|
Database string `json:"database"`
|
|||
|
Coll string `json:"coll"`
|
|||
|
// ConnectTimeoutMS specifies a timeout that is used for creating connections to the server.
|
|||
|
// If set to 0, no timeout will be used.
|
|||
|
// The default is 30 seconds.
|
|||
|
ConnectTimeoutMS *int64 `json:"connectTimeoutMS"`
|
|||
|
// MaxPoolSize specifies that maximum number of connections allowed in the driver's connection pool to each server.
|
|||
|
// If this is 0, it will be set to math.MaxInt64,
|
|||
|
// The default is 100.
|
|||
|
MaxPoolSize *uint64 `json:"maxPoolSize"`
|
|||
|
// MinPoolSize specifies the minimum number of connections allowed in the driver's connection pool to each server. If
|
|||
|
// this is non-zero, each server's pool will be maintained in the background to ensure that the size does not fall below
|
|||
|
// the minimum. This can also be set through the "minPoolSize" URI option (e.g. "minPoolSize=100"). The default is 0.
|
|||
|
MinPoolSize *uint64 `json:"minPoolSize"`
|
|||
|
// SocketTimeoutMS specifies how long the driver will wait for a socket read or write to return before returning a
|
|||
|
// network error. If this is 0 meaning no timeout is used and socket operations can block indefinitely.
|
|||
|
// The default is 300,000 ms.
|
|||
|
SocketTimeoutMS *int64 `json:"socketTimeoutMS"`
|
|||
|
// ReadPreference determines which servers are considered suitable for read operations.
|
|||
|
// default is PrimaryMode
|
|||
|
ReadPreference *ReadPref `json:"readPreference"`
|
|||
|
// can be used to provide authentication options when configuring a Client.
|
|||
|
Auth *Credential `json:"auth"`
|
|||
|
}
|
|||
|
|
|||
|
// Credential can be used to provide authentication options when configuring a Client.
|
|||
|
//
|
|||
|
// AuthMechanism: the mechanism to use for authentication. Supported values include "SCRAM-SHA-256", "SCRAM-SHA-1",
|
|||
|
// "MONGODB-CR", "PLAIN", "GSSAPI", "MONGODB-X509", and "MONGODB-AWS". This can also be set through the "authMechanism"
|
|||
|
// URI option. (e.g. "authMechanism=PLAIN"). For more information, see
|
|||
|
// https://docs.mongodb.com/manual/core/authentication-mechanisms/.
|
|||
|
// AuthSource: the name of the database to use for authentication. This defaults to "$external" for MONGODB-X509,
|
|||
|
// GSSAPI, and PLAIN and "admin" for all other mechanisms. This can also be set through the "authSource" URI option
|
|||
|
// (e.g. "authSource=otherDb").
|
|||
|
//
|
|||
|
// Username: the username for authentication. This can also be set through the URI as a username:password pair before
|
|||
|
// the first @ character. For example, a URI for user "user", password "pwd", and host "localhost:27017" would be
|
|||
|
// "mongodb://user:pwd@localhost:27017". This is optional for X509 authentication and will be extracted from the
|
|||
|
// client certificate if not specified.
|
|||
|
//
|
|||
|
// Password: the password for authentication. This must not be specified for X509 and is optional for GSSAPI
|
|||
|
// authentication.
|
|||
|
//
|
|||
|
// PasswordSet: For GSSAPI, this must be true if a password is specified, even if the password is the empty string, and
|
|||
|
// false if no password is specified, indicating that the password should be taken from the context of the running
|
|||
|
// process. For other mechanisms, this field is ignored.
|
|||
|
type Credential struct {
|
|||
|
AuthMechanism string `json:"authMechanism"`
|
|||
|
AuthSource string `json:"authSource"`
|
|||
|
Username string `json:"username"`
|
|||
|
Password string `json:"password"`
|
|||
|
PasswordSet bool `json:"passwordSet"`
|
|||
|
}
|
|||
|
|
|||
|
// ReadPref determines which servers are considered suitable for read operations.
|
|||
|
type ReadPref struct {
|
|||
|
// MaxStaleness is the maximum amount of time to allow a server to be considered eligible for selection.
|
|||
|
// Supported from version 3.4.
|
|||
|
MaxStalenessMS int64 `json:"maxStalenessMS"`
|
|||
|
// indicates the user's preference on reads.
|
|||
|
// PrimaryMode as default
|
|||
|
Mode readpref.Mode `json:"mode"`
|
|||
|
}
|
|||
|
|
|||
|
// QmgoClient specifies the instance to operate mongoDB
|
|||
|
type QmgoClient struct {
|
|||
|
*Collection
|
|||
|
*Database
|
|||
|
*Client
|
|||
|
}
|
|||
|
|
|||
|
// Open creates client instance according to config
|
|||
|
// QmgoClient can operates all qmgo.client 、qmgo.database and qmgo.collection
|
|||
|
func Open(ctx context.Context, conf *Config, o ...options.ClientOptions) (cli *QmgoClient, err error) {
|
|||
|
client, err := NewClient(ctx, conf, o...)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("new client fail", err)
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
db := client.Database(conf.Database)
|
|||
|
coll := db.Collection(conf.Coll)
|
|||
|
|
|||
|
cli = &QmgoClient{
|
|||
|
Client: client,
|
|||
|
Database: db,
|
|||
|
Collection: coll,
|
|||
|
}
|
|||
|
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
// Client creates client to mongo
|
|||
|
type Client struct {
|
|||
|
client *mongo.Client
|
|||
|
conf Config
|
|||
|
|
|||
|
registry *bsoncodec.Registry
|
|||
|
}
|
|||
|
|
|||
|
// NewClient creates Qmgo MongoDB client
|
|||
|
func NewClient(ctx context.Context, conf *Config, o ...options.ClientOptions) (cli *Client, err error) {
|
|||
|
opt, err := newConnectOpts(conf, o...)
|
|||
|
if err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
client, err := client(ctx, opt)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("new client fail", err)
|
|||
|
return
|
|||
|
}
|
|||
|
cli = &Client{
|
|||
|
client: client,
|
|||
|
conf: *conf,
|
|||
|
registry: opt.Registry,
|
|||
|
}
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
// client creates connection to MongoDB
|
|||
|
func client(ctx context.Context, opt *officialOpts.ClientOptions) (client *mongo.Client, err error) {
|
|||
|
client, err = mongo.Connect(ctx, opt)
|
|||
|
if err != nil {
|
|||
|
fmt.Println(err)
|
|||
|
return
|
|||
|
}
|
|||
|
// half of default connect timeout
|
|||
|
pCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|||
|
defer cancel()
|
|||
|
if err = client.Ping(pCtx, readpref.Primary()); err != nil {
|
|||
|
fmt.Println(err)
|
|||
|
return
|
|||
|
}
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
// newConnectOpts creates client options from conf
|
|||
|
// Qmgo will follow this way official mongodb driver do:
|
|||
|
// - the configuration in uri takes precedence over the configuration in the setter
|
|||
|
// - Check the validity of the configuration in the uri, while the configuration in the setter is basically not checked
|
|||
|
func newConnectOpts(conf *Config, o ...options.ClientOptions) (*officialOpts.ClientOptions, error) {
|
|||
|
option := officialOpts.Client()
|
|||
|
for _, apply := range o {
|
|||
|
option = officialOpts.MergeClientOptions(apply.ClientOptions)
|
|||
|
}
|
|||
|
if conf.ConnectTimeoutMS != nil {
|
|||
|
timeoutDur := time.Duration(*conf.ConnectTimeoutMS) * time.Millisecond
|
|||
|
option.SetConnectTimeout(timeoutDur)
|
|||
|
|
|||
|
}
|
|||
|
if conf.SocketTimeoutMS != nil {
|
|||
|
timeoutDur := time.Duration(*conf.SocketTimeoutMS) * time.Millisecond
|
|||
|
option.SetSocketTimeout(timeoutDur)
|
|||
|
} else {
|
|||
|
option.SetSocketTimeout(300 * time.Second)
|
|||
|
}
|
|||
|
if conf.MaxPoolSize != nil {
|
|||
|
option.SetMaxPoolSize(*conf.MaxPoolSize)
|
|||
|
}
|
|||
|
if conf.MinPoolSize != nil {
|
|||
|
option.SetMinPoolSize(*conf.MinPoolSize)
|
|||
|
}
|
|||
|
if conf.ReadPreference != nil {
|
|||
|
readPreference, err := newReadPref(*conf.ReadPreference)
|
|||
|
if err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
option.SetReadPreference(readPreference)
|
|||
|
}
|
|||
|
if conf.Auth != nil {
|
|||
|
auth, err := newAuth(*conf.Auth)
|
|||
|
if err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
option.SetAuth(auth)
|
|||
|
}
|
|||
|
option.ApplyURI(conf.Uri)
|
|||
|
|
|||
|
return option, nil
|
|||
|
}
|
|||
|
|
|||
|
// newAuth create options.Credential from conf.Auth
|
|||
|
func newAuth(auth Credential) (credential officialOpts.Credential, err error) {
|
|||
|
if auth.AuthMechanism != "" {
|
|||
|
credential.AuthMechanism = auth.AuthMechanism
|
|||
|
}
|
|||
|
if auth.AuthSource != "" {
|
|||
|
credential.AuthSource = auth.AuthSource
|
|||
|
}
|
|||
|
if auth.Username != "" {
|
|||
|
// Validate and process the username.
|
|||
|
if strings.Contains(auth.Username, "/") {
|
|||
|
err = ErrNotSupportedUsername
|
|||
|
return
|
|||
|
}
|
|||
|
credential.Username, err = url.QueryUnescape(auth.Username)
|
|||
|
if err != nil {
|
|||
|
err = ErrNotSupportedUsername
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
credential.PasswordSet = auth.PasswordSet
|
|||
|
if auth.Password != "" {
|
|||
|
if strings.Contains(auth.Password, ":") {
|
|||
|
err = ErrNotSupportedPassword
|
|||
|
return
|
|||
|
}
|
|||
|
if strings.Contains(auth.Password, "/") {
|
|||
|
err = ErrNotSupportedPassword
|
|||
|
return
|
|||
|
}
|
|||
|
credential.Password, err = url.QueryUnescape(auth.Password)
|
|||
|
if err != nil {
|
|||
|
err = ErrNotSupportedPassword
|
|||
|
return
|
|||
|
}
|
|||
|
credential.Password = auth.Password
|
|||
|
}
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
// newReadPref create readpref.ReadPref from config
|
|||
|
func newReadPref(pref ReadPref) (*readpref.ReadPref, error) {
|
|||
|
readPrefOpts := make([]readpref.Option, 0, 1)
|
|||
|
if pref.MaxStalenessMS != 0 {
|
|||
|
readPrefOpts = append(readPrefOpts, readpref.WithMaxStaleness(time.Duration(pref.MaxStalenessMS)*time.Millisecond))
|
|||
|
}
|
|||
|
mode := readpref.PrimaryMode
|
|||
|
if pref.Mode != 0 {
|
|||
|
mode = pref.Mode
|
|||
|
}
|
|||
|
readPreference, err := readpref.New(mode, readPrefOpts...)
|
|||
|
return readPreference, err
|
|||
|
}
|
|||
|
|
|||
|
// Close closes sockets to the topology referenced by this Client.
|
|||
|
func (c *Client) Close(ctx context.Context) error {
|
|||
|
err := c.client.Disconnect(ctx)
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
// Ping confirm connection is alive
|
|||
|
func (c *Client) Ping(timeout int64) error {
|
|||
|
var err error
|
|||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
|
|||
|
defer cancel()
|
|||
|
|
|||
|
if err = c.client.Ping(ctx, readpref.Primary()); err != nil {
|
|||
|
return err
|
|||
|
}
|
|||
|
return nil
|
|||
|
}
|
|||
|
|
|||
|
// Database create connection to database
|
|||
|
func (c *Client) Database(name string, options ...*options.DatabaseOptions) *Database {
|
|||
|
opts := make([]*officialOpts.DatabaseOptions, 0, len(options))
|
|||
|
for _, o := range options {
|
|||
|
opts = append(opts, o.DatabaseOptions)
|
|||
|
}
|
|||
|
databaseOpts := officialOpts.MergeDatabaseOptions(opts...)
|
|||
|
return &Database{database: c.client.Database(name, databaseOpts), registry: c.registry}
|
|||
|
}
|
|||
|
|
|||
|
// Session create one session on client
|
|||
|
// Watch out, close session after operation done
|
|||
|
func (c *Client) Session(opt ...*options.SessionOptions) (*Session, error) {
|
|||
|
sessionOpts := officialOpts.Session()
|
|||
|
if len(opt) > 0 && opt[0].SessionOptions != nil {
|
|||
|
sessionOpts = opt[0].SessionOptions
|
|||
|
}
|
|||
|
s, err := c.client.StartSession(sessionOpts)
|
|||
|
return &Session{session: s}, err
|
|||
|
}
|
|||
|
|
|||
|
// DoTransaction do whole transaction in one function
|
|||
|
// precondition:
|
|||
|
// - version of mongoDB server >= v4.0
|
|||
|
// - Topology of mongoDB server is not Single
|
|||
|
// At the same time, please pay attention to the following
|
|||
|
// - make sure all operations in callback use the sessCtx as context parameter
|
|||
|
// - if operations in callback takes more than(include equal) 120s, the operations will not take effect,
|
|||
|
// - if operation in callback return qmgo.ErrTransactionRetry,
|
|||
|
// the whole transaction will retry, so this transaction must be idempotent
|
|||
|
// - if operations in callback return qmgo.ErrTransactionNotSupported,
|
|||
|
// - If the ctx parameter already has a Session attached to it, it will be replaced by this session.
|
|||
|
func (c *Client) DoTransaction(ctx context.Context, callback func(sessCtx context.Context) (interface{}, error), opts ...*options.TransactionOptions) (interface{}, error) {
|
|||
|
if !c.transactionAllowed() {
|
|||
|
return nil, ErrTransactionNotSupported
|
|||
|
}
|
|||
|
s, err := c.Session()
|
|||
|
if err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
defer s.EndSession(ctx)
|
|||
|
return s.StartTransaction(ctx, callback, opts...)
|
|||
|
}
|
|||
|
|
|||
|
// ServerVersion get the version of mongoDB server, like 4.4.0
|
|||
|
func (c *Client) ServerVersion() string {
|
|||
|
var buildInfo bson.Raw
|
|||
|
err := c.client.Database("admin").RunCommand(
|
|||
|
context.Background(),
|
|||
|
bson.D{{"buildInfo", 1}},
|
|||
|
).Decode(&buildInfo)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("run command err", err)
|
|||
|
return ""
|
|||
|
}
|
|||
|
v, err := buildInfo.LookupErr("version")
|
|||
|
if err != nil {
|
|||
|
fmt.Println("look up err", err)
|
|||
|
return ""
|
|||
|
}
|
|||
|
return v.StringValue()
|
|||
|
}
|
|||
|
|
|||
|
// transactionAllowed check if transaction is allowed
|
|||
|
func (c *Client) transactionAllowed() bool {
|
|||
|
vr, err := CompareVersions("4.0", c.ServerVersion())
|
|||
|
if err != nil {
|
|||
|
return false
|
|||
|
}
|
|||
|
if vr > 0 {
|
|||
|
fmt.Println("transaction is not supported because mongo server version is below 4.0")
|
|||
|
return false
|
|||
|
}
|
|||
|
// TODO dont know why need to do `cli, err := Open(ctx, &c.conf)` in topology() to get topo,
|
|||
|
// Before figure it out, we only use this function in UT
|
|||
|
//topo, err := c.topology()
|
|||
|
//if topo == description.Single {
|
|||
|
// fmt.Println("transaction is not supported because mongo server topology is single")
|
|||
|
// return false
|
|||
|
//}
|
|||
|
return true
|
|||
|
}
|