436 lines
12 KiB
Go
436 lines
12 KiB
Go
/*
|
||
Copyright 2020 The Qmgo Authors.
|
||
Licensed under the Apache License, Version 2.0 (the "License");
|
||
you may not use this file except in compliance with the License.
|
||
You may obtain a copy of the License at
|
||
http://www.apache.org/licenses/LICENSE-2.0
|
||
Unless required by applicable law or agreed to in writing, software
|
||
distributed under the License is distributed on an "AS IS" BASIS,
|
||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
See the License for the specific language governing permissions and
|
||
limitations under the License.
|
||
*/
|
||
|
||
package qmgo
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"reflect"
|
||
|
||
"github.com/qiniu/qmgo/middleware"
|
||
"github.com/qiniu/qmgo/operator"
|
||
qOpts "github.com/qiniu/qmgo/options"
|
||
"go.mongodb.org/mongo-driver/bson"
|
||
"go.mongodb.org/mongo-driver/bson/bsoncodec"
|
||
"go.mongodb.org/mongo-driver/mongo"
|
||
"go.mongodb.org/mongo-driver/mongo/options"
|
||
)
|
||
|
||
// Query struct definition
|
||
type Query struct {
|
||
filter interface{}
|
||
sort interface{}
|
||
project interface{}
|
||
hint interface{}
|
||
arrayFilters *options.ArrayFilters
|
||
limit *int64
|
||
skip *int64
|
||
batchSize *int64
|
||
noCursorTimeout *bool
|
||
collation *options.Collation
|
||
|
||
ctx context.Context
|
||
collection *mongo.Collection
|
||
opts []qOpts.FindOptions
|
||
registry *bsoncodec.Registry
|
||
}
|
||
|
||
func (q *Query) Collation(collation *options.Collation) QueryI {
|
||
newQ := q
|
||
newQ.collation = collation
|
||
return newQ
|
||
}
|
||
|
||
func (q *Query) NoCursorTimeout(n bool) QueryI {
|
||
newQ := q
|
||
newQ.noCursorTimeout = &n
|
||
return newQ
|
||
}
|
||
|
||
// BatchSize sets the value for the BatchSize field.
|
||
// Means the maximum number of documents to be included in each batch returned by the server.
|
||
func (q *Query) BatchSize(n int64) QueryI {
|
||
newQ := q
|
||
newQ.batchSize = &n
|
||
return newQ
|
||
}
|
||
|
||
// Sort is Used to set the sorting rules for the returned results
|
||
// Format: "age" or "+age" means to sort the age field in ascending order, "-age" means in descending order
|
||
// When multiple sort fields are passed in at the same time, they are arranged in the order in which the fields are passed in.
|
||
// For example, {"age", "-name"}, first sort by age in ascending order, then sort by name in descending order
|
||
func (q *Query) Sort(fields ...string) QueryI {
|
||
if len(fields) == 0 {
|
||
// A nil bson.D will not correctly serialize, but this case is no-op
|
||
// so an early return will do.
|
||
return q
|
||
}
|
||
|
||
var sorts bson.D
|
||
for _, field := range fields {
|
||
key, n := SplitSortField(field)
|
||
if key == "" {
|
||
panic("Sort: empty field name")
|
||
}
|
||
sorts = append(sorts, bson.E{Key: key, Value: n})
|
||
}
|
||
newQ := q
|
||
newQ.sort = sorts
|
||
return newQ
|
||
}
|
||
|
||
// SetArrayFilter use for apply update array
|
||
// For Example :
|
||
// var res = QueryTestItem{}
|
||
// change := Change{
|
||
// Update: bson.M{"$set": bson.M{"instock.$[elem].qty": 100}},
|
||
// ReturnNew: false,
|
||
// }
|
||
// cli.Find(context.Background(), bson.M{"name": "Lucas"}).
|
||
// SetArrayFilters(&options.ArrayFilters{Filters: []interface{}{bson.M{"elem.warehouse": bson.M{"$in": []string{"C", "F"}}},}}).
|
||
// Apply(change, &res)
|
||
func (q *Query) SetArrayFilters(filter *options.ArrayFilters) QueryI {
|
||
newQ := q
|
||
newQ.arrayFilters = filter
|
||
return newQ
|
||
}
|
||
|
||
// Select is used to determine which fields are displayed or not displayed in the returned results
|
||
// Format: bson.M{"age": 1} means that only the age field is displayed
|
||
// bson.M{"age": 0} means to display other fields except age
|
||
// When _id is not displayed and is set to 0, it will be returned to display
|
||
func (q *Query) Select(projection interface{}) QueryI {
|
||
newQ := q
|
||
newQ.project = projection
|
||
return newQ
|
||
}
|
||
|
||
// Skip skip n records
|
||
func (q *Query) Skip(n int64) QueryI {
|
||
newQ := q
|
||
newQ.skip = &n
|
||
return newQ
|
||
}
|
||
|
||
// Hint sets the value for the Hint field.
|
||
// This should either be the index name as a string or the index specification
|
||
// as a document. The default value is nil, which means that no hint will be sent.
|
||
func (q *Query) Hint(hint interface{}) QueryI {
|
||
newQ := q
|
||
newQ.hint = hint
|
||
return newQ
|
||
}
|
||
|
||
// Limit limits the maximum number of documents found to n
|
||
// The default value is 0, and 0 means no limit, and all matching results are returned
|
||
// When the limit value is less than 0, the negative limit is similar to the positive limit, but the cursor is closed after returning a single batch result.
|
||
// Reference https://docs.mongodb.com/manual/reference/method/cursor.limit/index.html
|
||
func (q *Query) Limit(n int64) QueryI {
|
||
newQ := q
|
||
newQ.limit = &n
|
||
return newQ
|
||
}
|
||
|
||
// One query a record that meets the filter conditions
|
||
// If the search fails, an error will be returned
|
||
func (q *Query) One(result interface{}) error {
|
||
if len(q.opts) > 0 {
|
||
if err := middleware.Do(q.ctx, q.opts[0].QueryHook, operator.BeforeQuery); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
opt := options.FindOne()
|
||
|
||
if q.collation != nil {
|
||
opt.SetCollation(q.collation)
|
||
}
|
||
if q.sort != nil {
|
||
opt.SetSort(q.sort)
|
||
}
|
||
if q.project != nil {
|
||
opt.SetProjection(q.project)
|
||
}
|
||
if q.skip != nil {
|
||
opt.SetSkip(*q.skip)
|
||
}
|
||
if q.hint != nil {
|
||
opt.SetHint(q.hint)
|
||
}
|
||
|
||
err := q.collection.FindOne(q.ctx, q.filter, opt).Decode(result)
|
||
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if len(q.opts) > 0 {
|
||
if err := middleware.Do(q.ctx, q.opts[0].QueryHook, operator.AfterQuery); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// All query multiple records that meet the filter conditions
|
||
// The static type of result must be a slice pointer
|
||
func (q *Query) All(result interface{}) error {
|
||
if len(q.opts) > 0 {
|
||
if err := middleware.Do(q.ctx, q.opts[0].QueryHook, operator.BeforeQuery); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
opt := options.Find()
|
||
if q.collation != nil {
|
||
opt.SetCollation(q.collation)
|
||
}
|
||
if q.sort != nil {
|
||
opt.SetSort(q.sort)
|
||
}
|
||
if q.project != nil {
|
||
opt.SetProjection(q.project)
|
||
}
|
||
if q.limit != nil {
|
||
opt.SetLimit(*q.limit)
|
||
}
|
||
if q.skip != nil {
|
||
opt.SetSkip(*q.skip)
|
||
}
|
||
if q.hint != nil {
|
||
opt.SetHint(q.hint)
|
||
}
|
||
if q.batchSize != nil {
|
||
opt.SetBatchSize(int32(*q.batchSize))
|
||
}
|
||
if q.noCursorTimeout != nil {
|
||
opt.SetNoCursorTimeout(*q.noCursorTimeout)
|
||
}
|
||
|
||
var err error
|
||
var cursor *mongo.Cursor
|
||
|
||
cursor, err = q.collection.Find(q.ctx, q.filter, opt)
|
||
|
||
c := Cursor{
|
||
ctx: q.ctx,
|
||
cursor: cursor,
|
||
err: err,
|
||
}
|
||
err = c.All(result)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if len(q.opts) > 0 {
|
||
if err := middleware.Do(q.ctx, q.opts[0].QueryHook, operator.AfterQuery); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Count count the number of eligible entries
|
||
func (q *Query) Count() (n int64, err error) {
|
||
opt := options.Count()
|
||
|
||
if q.limit != nil {
|
||
opt.SetLimit(*q.limit)
|
||
}
|
||
if q.skip != nil {
|
||
opt.SetSkip(*q.skip)
|
||
}
|
||
|
||
return q.collection.CountDocuments(q.ctx, q.filter, opt)
|
||
}
|
||
|
||
// EstimatedCount count the number of the collection by using the metadata
|
||
func (q *Query) EstimatedCount() (n int64, err error) {
|
||
return q.collection.EstimatedDocumentCount(q.ctx)
|
||
}
|
||
|
||
// Distinct gets the unique value of the specified field in the collection and return it in the form of slice
|
||
// result should be passed a pointer to slice
|
||
// The function will verify whether the static type of the elements in the result slice is consistent with the data type obtained in mongodb
|
||
// reference https://docs.mongodb.com/manual/reference/command/distinct/
|
||
func (q *Query) Distinct(key string, result interface{}) error {
|
||
resultVal := reflect.ValueOf(result)
|
||
|
||
if resultVal.Kind() != reflect.Ptr {
|
||
return ErrQueryNotSlicePointer
|
||
}
|
||
|
||
resultElmVal := resultVal.Elem()
|
||
if resultElmVal.Kind() != reflect.Interface && resultElmVal.Kind() != reflect.Slice {
|
||
return ErrQueryNotSliceType
|
||
}
|
||
|
||
opt := options.Distinct()
|
||
res, err := q.collection.Distinct(q.ctx, key, q.filter, opt)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
registry := q.registry
|
||
if registry == nil {
|
||
registry = bson.DefaultRegistry
|
||
}
|
||
valueType, valueBytes, err_ := bson.MarshalValueWithRegistry(registry, res)
|
||
if err_ != nil {
|
||
fmt.Printf("bson.MarshalValue err: %+v\n", err_)
|
||
return err_
|
||
}
|
||
|
||
rawValue := bson.RawValue{Type: valueType, Value: valueBytes}
|
||
err = rawValue.Unmarshal(result)
|
||
if err != nil {
|
||
fmt.Printf("rawValue.Unmarshal err: %+v\n", err)
|
||
return ErrQueryResultTypeInconsistent
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Cursor gets a Cursor object, which can be used to traverse the query result set
|
||
// After obtaining the CursorI object, you should actively call the Close interface to close the cursor
|
||
func (q *Query) Cursor() CursorI {
|
||
opt := options.Find()
|
||
|
||
if q.sort != nil {
|
||
opt.SetSort(q.sort)
|
||
}
|
||
if q.project != nil {
|
||
opt.SetProjection(q.project)
|
||
}
|
||
if q.limit != nil {
|
||
opt.SetLimit(*q.limit)
|
||
}
|
||
if q.skip != nil {
|
||
opt.SetSkip(*q.skip)
|
||
}
|
||
|
||
if q.batchSize != nil {
|
||
opt.SetBatchSize(int32(*q.batchSize))
|
||
}
|
||
if q.noCursorTimeout != nil {
|
||
opt.SetNoCursorTimeout(*q.noCursorTimeout)
|
||
}
|
||
|
||
var err error
|
||
var cur *mongo.Cursor
|
||
cur, err = q.collection.Find(q.ctx, q.filter, opt)
|
||
return &Cursor{
|
||
ctx: q.ctx,
|
||
cursor: cur,
|
||
err: err,
|
||
}
|
||
}
|
||
|
||
// Apply runs the findAndModify command, which allows updating, replacing
|
||
// or removing a document matching a query and atomically returning either the old
|
||
// version (the default) or the new version of the document (when ReturnNew is true)
|
||
//
|
||
// The Sort and Select query methods affect the result of Apply. In case
|
||
// multiple documents match the query, Sort enables selecting which document to
|
||
// act upon by ordering it first. Select enables retrieving only a selection
|
||
// of fields of the new or old document.
|
||
//
|
||
// When Change.Replace is true, it means replace at most one document in the collection
|
||
// and the update parameter must be a document and cannot contain any update operators;
|
||
// if no objects are found and Change.Upsert is false, it will returns ErrNoDocuments.
|
||
// When Change.Remove is true, it means delete at most one document in the collection
|
||
// and returns the document as it appeared before deletion; if no objects are found,
|
||
// it will returns ErrNoDocuments.
|
||
// When both Change.Replace and Change.Remove are false,it means update at most one document
|
||
// in the collection and the update parameter must be a document containing update operators;
|
||
// if no objects are found and Change.Upsert is false, it will returns ErrNoDocuments.
|
||
//
|
||
// reference: https://docs.mongodb.com/manual/reference/command/findAndModify/
|
||
func (q *Query) Apply(change Change, result interface{}) error {
|
||
var err error
|
||
|
||
if change.Remove {
|
||
err = q.findOneAndDelete(change, result)
|
||
} else if change.Replace {
|
||
err = q.findOneAndReplace(change, result)
|
||
} else {
|
||
err = q.findOneAndUpdate(change, result)
|
||
}
|
||
|
||
return err
|
||
}
|
||
|
||
// findOneAndDelete
|
||
// reference: https://docs.mongodb.com/manual/reference/method/db.collection.findOneAndDelete/
|
||
func (q *Query) findOneAndDelete(change Change, result interface{}) error {
|
||
opts := options.FindOneAndDelete()
|
||
if q.sort != nil {
|
||
opts.SetSort(q.sort)
|
||
}
|
||
if q.project != nil {
|
||
opts.SetProjection(q.project)
|
||
}
|
||
|
||
return q.collection.FindOneAndDelete(q.ctx, q.filter, opts).Decode(result)
|
||
}
|
||
|
||
// findOneAndReplace
|
||
// reference: https://docs.mongodb.com/manual/reference/method/db.collection.findOneAndReplace/
|
||
func (q *Query) findOneAndReplace(change Change, result interface{}) error {
|
||
opts := options.FindOneAndReplace()
|
||
if q.sort != nil {
|
||
opts.SetSort(q.sort)
|
||
}
|
||
if q.project != nil {
|
||
opts.SetProjection(q.project)
|
||
}
|
||
if change.Upsert {
|
||
opts.SetUpsert(change.Upsert)
|
||
}
|
||
if change.ReturnNew {
|
||
opts.SetReturnDocument(options.After)
|
||
}
|
||
|
||
err := q.collection.FindOneAndReplace(q.ctx, q.filter, change.Update, opts).Decode(result)
|
||
if change.Upsert && !change.ReturnNew && err == mongo.ErrNoDocuments {
|
||
return nil
|
||
}
|
||
|
||
return err
|
||
}
|
||
|
||
// findOneAndUpdate
|
||
// reference: https://docs.mongodb.com/manual/reference/method/db.collection.findOneAndUpdate/
|
||
func (q *Query) findOneAndUpdate(change Change, result interface{}) error {
|
||
opts := options.FindOneAndUpdate()
|
||
if q.sort != nil {
|
||
opts.SetSort(q.sort)
|
||
}
|
||
if q.project != nil {
|
||
opts.SetProjection(q.project)
|
||
}
|
||
if change.Upsert {
|
||
opts.SetUpsert(change.Upsert)
|
||
}
|
||
if change.ReturnNew {
|
||
opts.SetReturnDocument(options.After)
|
||
}
|
||
|
||
if q.arrayFilters != nil {
|
||
opts.SetArrayFilters(*q.arrayFilters)
|
||
}
|
||
|
||
err := q.collection.FindOneAndUpdate(q.ctx, q.filter, change.Update, opts).Decode(result)
|
||
if change.Upsert && !change.ReturnNew && err == mongo.ErrNoDocuments {
|
||
return nil
|
||
}
|
||
|
||
return err
|
||
}
|