706 lines
22 KiB
Go
706 lines
22 KiB
Go
|
// Copyright (C) MongoDB, Inc. 2017-present.
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||
|
// not use this file except in compliance with the License. You may obtain
|
||
|
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||
|
|
||
|
package mongo
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"reflect"
|
||
|
"strconv"
|
||
|
"time"
|
||
|
|
||
|
"go.mongodb.org/mongo-driver/bson"
|
||
|
"go.mongodb.org/mongo-driver/bson/bsoncodec"
|
||
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||
|
"go.mongodb.org/mongo-driver/internal"
|
||
|
"go.mongodb.org/mongo-driver/mongo/description"
|
||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||
|
"go.mongodb.org/mongo-driver/mongo/readconcern"
|
||
|
"go.mongodb.org/mongo-driver/mongo/readpref"
|
||
|
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
|
||
|
"go.mongodb.org/mongo-driver/x/mongo/driver"
|
||
|
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
|
||
|
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// ErrMissingResumeToken indicates that a change stream notification from the server did not contain a resume token.
|
||
|
ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing")
|
||
|
// ErrNilCursor indicates that the underlying cursor for the change stream is nil.
|
||
|
ErrNilCursor = errors.New("cursor is nil")
|
||
|
|
||
|
minResumableLabelWireVersion int32 = 9 // Wire version at which the server includes the resumable error label
|
||
|
networkErrorLabel = "NetworkError"
|
||
|
resumableErrorLabel = "ResumableChangeStreamError"
|
||
|
errorCursorNotFound int32 = 43 // CursorNotFound error code
|
||
|
|
||
|
// Allowlist of error codes that are considered resumable.
|
||
|
resumableChangeStreamErrors = map[int32]struct{}{
|
||
|
6: {}, // HostUnreachable
|
||
|
7: {}, // HostNotFound
|
||
|
89: {}, // NetworkTimeout
|
||
|
91: {}, // ShutdownInProgress
|
||
|
189: {}, // PrimarySteppedDown
|
||
|
262: {}, // ExceededTimeLimit
|
||
|
9001: {}, // SocketException
|
||
|
10107: {}, // NotPrimary
|
||
|
11600: {}, // InterruptedAtShutdown
|
||
|
11602: {}, // InterruptedDueToReplStateChange
|
||
|
13435: {}, // NotPrimaryNoSecondaryOK
|
||
|
13436: {}, // NotPrimaryOrSecondary
|
||
|
63: {}, // StaleShardVersion
|
||
|
150: {}, // StaleEpoch
|
||
|
13388: {}, // StaleConfig
|
||
|
234: {}, // RetryChangeStream
|
||
|
133: {}, // FailedToSatisfyReadPreference
|
||
|
}
|
||
|
)
|
||
|
|
||
|
// ChangeStream is used to iterate over a stream of events. Each event can be decoded into a Go type via the Decode
|
||
|
// method or accessed as raw BSON via the Current field. This type is not goroutine safe and must not be used
|
||
|
// concurrently by multiple goroutines. For more information about change streams, see
|
||
|
// https://www.mongodb.com/docs/manual/changeStreams/.
|
||
|
type ChangeStream struct {
|
||
|
// Current is the BSON bytes of the current event. This property is only valid until the next call to Next or
|
||
|
// TryNext. If continued access is required, a copy must be made.
|
||
|
Current bson.Raw
|
||
|
|
||
|
aggregate *operation.Aggregate
|
||
|
pipelineSlice []bsoncore.Document
|
||
|
pipelineOptions map[string]bsoncore.Value
|
||
|
cursor changeStreamCursor
|
||
|
cursorOptions driver.CursorOptions
|
||
|
batch []bsoncore.Document
|
||
|
resumeToken bson.Raw
|
||
|
err error
|
||
|
sess *session.Client
|
||
|
client *Client
|
||
|
registry *bsoncodec.Registry
|
||
|
streamType StreamType
|
||
|
options *options.ChangeStreamOptions
|
||
|
selector description.ServerSelector
|
||
|
operationTime *primitive.Timestamp
|
||
|
wireVersion *description.VersionRange
|
||
|
}
|
||
|
|
||
|
type changeStreamConfig struct {
|
||
|
readConcern *readconcern.ReadConcern
|
||
|
readPreference *readpref.ReadPref
|
||
|
client *Client
|
||
|
registry *bsoncodec.Registry
|
||
|
streamType StreamType
|
||
|
collectionName string
|
||
|
databaseName string
|
||
|
crypt driver.Crypt
|
||
|
}
|
||
|
|
||
|
func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{},
|
||
|
opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
|
||
|
if ctx == nil {
|
||
|
ctx = context.Background()
|
||
|
}
|
||
|
|
||
|
cs := &ChangeStream{
|
||
|
client: config.client,
|
||
|
registry: config.registry,
|
||
|
streamType: config.streamType,
|
||
|
options: options.MergeChangeStreamOptions(opts...),
|
||
|
selector: description.CompositeSelector([]description.ServerSelector{
|
||
|
description.ReadPrefSelector(config.readPreference),
|
||
|
description.LatencySelector(config.client.localThreshold),
|
||
|
}),
|
||
|
cursorOptions: config.client.createBaseCursorOptions(),
|
||
|
}
|
||
|
|
||
|
cs.sess = sessionFromContext(ctx)
|
||
|
if cs.sess == nil && cs.client.sessionPool != nil {
|
||
|
cs.sess = session.NewImplicitClientSession(cs.client.sessionPool, cs.client.id)
|
||
|
}
|
||
|
if cs.err = cs.client.validSession(cs.sess); cs.err != nil {
|
||
|
closeImplicitSession(cs.sess)
|
||
|
return nil, cs.Err()
|
||
|
}
|
||
|
|
||
|
cs.aggregate = operation.NewAggregate(nil).
|
||
|
ReadPreference(config.readPreference).ReadConcern(config.readConcern).
|
||
|
Deployment(cs.client.deployment).ClusterClock(cs.client.clock).
|
||
|
CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone).
|
||
|
ServerAPI(cs.client.serverAPI).Crypt(config.crypt).Timeout(cs.client.timeout)
|
||
|
|
||
|
if cs.options.Collation != nil {
|
||
|
cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument()))
|
||
|
}
|
||
|
if comment := cs.options.Comment; comment != nil {
|
||
|
cs.aggregate.Comment(*comment)
|
||
|
|
||
|
commentVal, err := transformValue(cs.registry, comment, true, "comment")
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
cs.cursorOptions.Comment = commentVal
|
||
|
}
|
||
|
if cs.options.BatchSize != nil {
|
||
|
cs.aggregate.BatchSize(*cs.options.BatchSize)
|
||
|
cs.cursorOptions.BatchSize = *cs.options.BatchSize
|
||
|
}
|
||
|
if cs.options.MaxAwaitTime != nil {
|
||
|
cs.cursorOptions.MaxTimeMS = int64(*cs.options.MaxAwaitTime / time.Millisecond)
|
||
|
}
|
||
|
if cs.options.Custom != nil {
|
||
|
// Marshal all custom options before passing to the initial aggregate. Return
|
||
|
// any errors from Marshaling.
|
||
|
customOptions := make(map[string]bsoncore.Value)
|
||
|
for optionName, optionValue := range cs.options.Custom {
|
||
|
bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
|
||
|
if err != nil {
|
||
|
cs.err = err
|
||
|
closeImplicitSession(cs.sess)
|
||
|
return nil, cs.Err()
|
||
|
}
|
||
|
optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
|
||
|
customOptions[optionName] = optionValueBSON
|
||
|
}
|
||
|
cs.aggregate.CustomOptions(customOptions)
|
||
|
}
|
||
|
if cs.options.CustomPipeline != nil {
|
||
|
// Marshal all custom pipeline options before building pipeline slice. Return
|
||
|
// any errors from Marshaling.
|
||
|
cs.pipelineOptions = make(map[string]bsoncore.Value)
|
||
|
for optionName, optionValue := range cs.options.CustomPipeline {
|
||
|
bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
|
||
|
if err != nil {
|
||
|
cs.err = err
|
||
|
closeImplicitSession(cs.sess)
|
||
|
return nil, cs.Err()
|
||
|
}
|
||
|
optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
|
||
|
cs.pipelineOptions[optionName] = optionValueBSON
|
||
|
}
|
||
|
}
|
||
|
|
||
|
switch cs.streamType {
|
||
|
case ClientStream:
|
||
|
cs.aggregate.Database("admin")
|
||
|
case DatabaseStream:
|
||
|
cs.aggregate.Database(config.databaseName)
|
||
|
case CollectionStream:
|
||
|
cs.aggregate.Collection(config.collectionName).Database(config.databaseName)
|
||
|
default:
|
||
|
closeImplicitSession(cs.sess)
|
||
|
return nil, fmt.Errorf("must supply a valid StreamType in config, instead of %v", cs.streamType)
|
||
|
}
|
||
|
|
||
|
// When starting a change stream, cache startAfter as the first resume token if it is set. If not, cache
|
||
|
// resumeAfter. If neither is set, do not cache a resume token.
|
||
|
resumeToken := cs.options.StartAfter
|
||
|
if resumeToken == nil {
|
||
|
resumeToken = cs.options.ResumeAfter
|
||
|
}
|
||
|
var marshaledToken bson.Raw
|
||
|
if resumeToken != nil {
|
||
|
if marshaledToken, cs.err = bson.Marshal(resumeToken); cs.err != nil {
|
||
|
closeImplicitSession(cs.sess)
|
||
|
return nil, cs.Err()
|
||
|
}
|
||
|
}
|
||
|
cs.resumeToken = marshaledToken
|
||
|
|
||
|
if cs.err = cs.buildPipelineSlice(pipeline); cs.err != nil {
|
||
|
closeImplicitSession(cs.sess)
|
||
|
return nil, cs.Err()
|
||
|
}
|
||
|
var pipelineArr bsoncore.Document
|
||
|
pipelineArr, cs.err = cs.pipelineToBSON()
|
||
|
cs.aggregate.Pipeline(pipelineArr)
|
||
|
|
||
|
if cs.err = cs.executeOperation(ctx, false); cs.err != nil {
|
||
|
closeImplicitSession(cs.sess)
|
||
|
return nil, cs.Err()
|
||
|
}
|
||
|
|
||
|
return cs, cs.Err()
|
||
|
}
|
||
|
|
||
|
func (cs *ChangeStream) createOperationDeployment(server driver.Server, connection driver.Connection) driver.Deployment {
|
||
|
return &changeStreamDeployment{
|
||
|
topologyKind: cs.client.deployment.Kind(),
|
||
|
server: server,
|
||
|
conn: connection,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
|
||
|
var server driver.Server
|
||
|
var conn driver.Connection
|
||
|
|
||
|
if server, cs.err = cs.client.deployment.SelectServer(ctx, cs.selector); cs.err != nil {
|
||
|
return cs.Err()
|
||
|
}
|
||
|
if conn, cs.err = server.Connection(ctx); cs.err != nil {
|
||
|
return cs.Err()
|
||
|
}
|
||
|
defer conn.Close()
|
||
|
cs.wireVersion = conn.Description().WireVersion
|
||
|
|
||
|
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
|
||
|
|
||
|
if resuming {
|
||
|
cs.replaceOptions(cs.wireVersion)
|
||
|
|
||
|
csOptDoc, err := cs.createPipelineOptionsDoc()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil)
|
||
|
pipDoc = bsoncore.AppendDocumentElement(pipDoc, "$changeStream", csOptDoc)
|
||
|
if pipDoc, cs.err = bsoncore.AppendDocumentEnd(pipDoc, pipIdx); cs.err != nil {
|
||
|
return cs.Err()
|
||
|
}
|
||
|
cs.pipelineSlice[0] = pipDoc
|
||
|
|
||
|
var plArr bsoncore.Document
|
||
|
if plArr, cs.err = cs.pipelineToBSON(); cs.err != nil {
|
||
|
return cs.Err()
|
||
|
}
|
||
|
cs.aggregate.Pipeline(plArr)
|
||
|
}
|
||
|
|
||
|
// If no deadline is set on the passed-in context, cs.client.timeout is set, and context is not already
|
||
|
// a Timeout context, honor cs.client.timeout in new Timeout context for change stream operation execution
|
||
|
// and potential retry.
|
||
|
if _, deadlineSet := ctx.Deadline(); !deadlineSet && cs.client.timeout != nil && !internal.IsTimeoutContext(ctx) {
|
||
|
newCtx, cancelFunc := internal.MakeTimeoutContext(ctx, *cs.client.timeout)
|
||
|
// Redefine ctx to be the new timeout-derived context.
|
||
|
ctx = newCtx
|
||
|
// Cancel the timeout-derived context at the end of executeOperation to avoid a context leak.
|
||
|
defer cancelFunc()
|
||
|
}
|
||
|
|
||
|
// Execute the aggregate, retrying on retryable errors once (1) if retryable reads are enabled and
|
||
|
// infinitely (-1) if context is a Timeout context.
|
||
|
var retries int
|
||
|
if cs.client.retryReads {
|
||
|
retries = 1
|
||
|
}
|
||
|
if internal.IsTimeoutContext(ctx) {
|
||
|
retries = -1
|
||
|
}
|
||
|
|
||
|
var err error
|
||
|
AggregateExecuteLoop:
|
||
|
for {
|
||
|
err = cs.aggregate.Execute(ctx)
|
||
|
// If no error or no retries remain, do not retry.
|
||
|
if err == nil || retries == 0 {
|
||
|
break AggregateExecuteLoop
|
||
|
}
|
||
|
|
||
|
switch tt := err.(type) {
|
||
|
case driver.Error:
|
||
|
// If error is not retryable, do not retry.
|
||
|
if !tt.RetryableRead() {
|
||
|
break AggregateExecuteLoop
|
||
|
}
|
||
|
|
||
|
// If error is retryable: subtract 1 from retries, redo server selection, checkout
|
||
|
// a connection, and restart loop.
|
||
|
retries--
|
||
|
server, err = cs.client.deployment.SelectServer(ctx, cs.selector)
|
||
|
if err != nil {
|
||
|
break AggregateExecuteLoop
|
||
|
}
|
||
|
|
||
|
conn.Close()
|
||
|
conn, err = server.Connection(ctx)
|
||
|
if err != nil {
|
||
|
break AggregateExecuteLoop
|
||
|
}
|
||
|
defer conn.Close()
|
||
|
|
||
|
// Update the wire version with data from the new connection.
|
||
|
cs.wireVersion = conn.Description().WireVersion
|
||
|
|
||
|
// Reset deployment.
|
||
|
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
|
||
|
default:
|
||
|
// Do not retry if error is not a driver error.
|
||
|
break AggregateExecuteLoop
|
||
|
}
|
||
|
}
|
||
|
if err != nil {
|
||
|
cs.err = replaceErrors(err)
|
||
|
return cs.err
|
||
|
}
|
||
|
|
||
|
cr := cs.aggregate.ResultCursorResponse()
|
||
|
cr.Server = server
|
||
|
|
||
|
cs.cursor, cs.err = driver.NewBatchCursor(cr, cs.sess, cs.client.clock, cs.cursorOptions)
|
||
|
if cs.err = replaceErrors(cs.err); cs.err != nil {
|
||
|
return cs.Err()
|
||
|
}
|
||
|
|
||
|
cs.updatePbrtFromCommand()
|
||
|
if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil &&
|
||
|
cs.options.StartAfter == nil && cs.wireVersion.Max >= 7 &&
|
||
|
cs.emptyBatch() && cs.resumeToken == nil {
|
||
|
cs.operationTime = cs.sess.OperationTime
|
||
|
}
|
||
|
|
||
|
return cs.Err()
|
||
|
}
|
||
|
|
||
|
// Updates the post batch resume token after a successful aggregate or getMore operation.
|
||
|
func (cs *ChangeStream) updatePbrtFromCommand() {
|
||
|
// Only cache the pbrt if an empty batch was returned and a pbrt was included
|
||
|
if pbrt := cs.cursor.PostBatchResumeToken(); cs.emptyBatch() && pbrt != nil {
|
||
|
cs.resumeToken = bson.Raw(pbrt)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (cs *ChangeStream) storeResumeToken() error {
|
||
|
// If cs.Current is the last document in the batch and a pbrt is included, cache the pbrt
|
||
|
// Otherwise, cache the _id of the document
|
||
|
var tokenDoc bson.Raw
|
||
|
if len(cs.batch) == 0 {
|
||
|
if pbrt := cs.cursor.PostBatchResumeToken(); pbrt != nil {
|
||
|
tokenDoc = bson.Raw(pbrt)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if tokenDoc == nil {
|
||
|
var ok bool
|
||
|
tokenDoc, ok = cs.Current.Lookup("_id").DocumentOK()
|
||
|
if !ok {
|
||
|
_ = cs.Close(context.Background())
|
||
|
return ErrMissingResumeToken
|
||
|
}
|
||
|
}
|
||
|
|
||
|
cs.resumeToken = tokenDoc
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (cs *ChangeStream) buildPipelineSlice(pipeline interface{}) error {
|
||
|
val := reflect.ValueOf(pipeline)
|
||
|
if !val.IsValid() || !(val.Kind() == reflect.Slice) {
|
||
|
cs.err = errors.New("can only transform slices and arrays into aggregation pipelines, but got invalid")
|
||
|
return cs.err
|
||
|
}
|
||
|
|
||
|
cs.pipelineSlice = make([]bsoncore.Document, 0, val.Len()+1)
|
||
|
|
||
|
csIdx, csDoc := bsoncore.AppendDocumentStart(nil)
|
||
|
|
||
|
csDocTemp, err := cs.createPipelineOptionsDoc()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
csDoc = bsoncore.AppendDocumentElement(csDoc, "$changeStream", csDocTemp)
|
||
|
csDoc, cs.err = bsoncore.AppendDocumentEnd(csDoc, csIdx)
|
||
|
if cs.err != nil {
|
||
|
return cs.err
|
||
|
}
|
||
|
cs.pipelineSlice = append(cs.pipelineSlice, csDoc)
|
||
|
|
||
|
for i := 0; i < val.Len(); i++ {
|
||
|
var elem []byte
|
||
|
elem, cs.err = transformBsoncoreDocument(cs.registry, val.Index(i).Interface(), true, fmt.Sprintf("pipeline stage :%v", i))
|
||
|
if cs.err != nil {
|
||
|
return cs.err
|
||
|
}
|
||
|
|
||
|
cs.pipelineSlice = append(cs.pipelineSlice, elem)
|
||
|
}
|
||
|
|
||
|
return cs.err
|
||
|
}
|
||
|
|
||
|
func (cs *ChangeStream) createPipelineOptionsDoc() (bsoncore.Document, error) {
|
||
|
plDocIdx, plDoc := bsoncore.AppendDocumentStart(nil)
|
||
|
|
||
|
if cs.streamType == ClientStream {
|
||
|
plDoc = bsoncore.AppendBooleanElement(plDoc, "allChangesForCluster", true)
|
||
|
}
|
||
|
|
||
|
if cs.options.FullDocument != nil && *cs.options.FullDocument != options.Default {
|
||
|
plDoc = bsoncore.AppendStringElement(plDoc, "fullDocument", string(*cs.options.FullDocument))
|
||
|
}
|
||
|
|
||
|
if cs.options.FullDocumentBeforeChange != nil {
|
||
|
plDoc = bsoncore.AppendStringElement(plDoc, "fullDocumentBeforeChange", string(*cs.options.FullDocumentBeforeChange))
|
||
|
}
|
||
|
|
||
|
if cs.options.ResumeAfter != nil {
|
||
|
var raDoc bsoncore.Document
|
||
|
raDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.ResumeAfter, true, "resumeAfter")
|
||
|
if cs.err != nil {
|
||
|
return nil, cs.err
|
||
|
}
|
||
|
|
||
|
plDoc = bsoncore.AppendDocumentElement(plDoc, "resumeAfter", raDoc)
|
||
|
}
|
||
|
|
||
|
if cs.options.ShowExpandedEvents != nil {
|
||
|
plDoc = bsoncore.AppendBooleanElement(plDoc, "showExpandedEvents", *cs.options.ShowExpandedEvents)
|
||
|
}
|
||
|
|
||
|
if cs.options.StartAfter != nil {
|
||
|
var saDoc bsoncore.Document
|
||
|
saDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.StartAfter, true, "startAfter")
|
||
|
if cs.err != nil {
|
||
|
return nil, cs.err
|
||
|
}
|
||
|
|
||
|
plDoc = bsoncore.AppendDocumentElement(plDoc, "startAfter", saDoc)
|
||
|
}
|
||
|
|
||
|
if cs.options.StartAtOperationTime != nil {
|
||
|
plDoc = bsoncore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.options.StartAtOperationTime.I)
|
||
|
}
|
||
|
|
||
|
// Append custom pipeline options.
|
||
|
for optionName, optionValue := range cs.pipelineOptions {
|
||
|
plDoc = bsoncore.AppendValueElement(plDoc, optionName, optionValue)
|
||
|
}
|
||
|
|
||
|
if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
|
||
|
return nil, cs.err
|
||
|
}
|
||
|
|
||
|
return plDoc, nil
|
||
|
}
|
||
|
|
||
|
func (cs *ChangeStream) pipelineToBSON() (bsoncore.Document, error) {
|
||
|
pipelineDocIdx, pipelineArr := bsoncore.AppendArrayStart(nil)
|
||
|
for i, doc := range cs.pipelineSlice {
|
||
|
pipelineArr = bsoncore.AppendDocumentElement(pipelineArr, strconv.Itoa(i), doc)
|
||
|
}
|
||
|
if pipelineArr, cs.err = bsoncore.AppendArrayEnd(pipelineArr, pipelineDocIdx); cs.err != nil {
|
||
|
return nil, cs.err
|
||
|
}
|
||
|
return pipelineArr, cs.err
|
||
|
}
|
||
|
|
||
|
func (cs *ChangeStream) replaceOptions(wireVersion *description.VersionRange) {
|
||
|
// Cached resume token: use the resume token as the resumeAfter option and set no other resume options
|
||
|
if cs.resumeToken != nil {
|
||
|
cs.options.SetResumeAfter(cs.resumeToken)
|
||
|
cs.options.SetStartAfter(nil)
|
||
|
cs.options.SetStartAtOperationTime(nil)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// No cached resume token but cached operation time: use the operation time as the startAtOperationTime option and
|
||
|
// set no other resume options
|
||
|
if (cs.sess.OperationTime != nil || cs.options.StartAtOperationTime != nil) && wireVersion.Max >= 7 {
|
||
|
opTime := cs.options.StartAtOperationTime
|
||
|
if cs.operationTime != nil {
|
||
|
opTime = cs.sess.OperationTime
|
||
|
}
|
||
|
|
||
|
cs.options.SetStartAtOperationTime(opTime)
|
||
|
cs.options.SetResumeAfter(nil)
|
||
|
cs.options.SetStartAfter(nil)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// No cached resume token or operation time: set none of the resume options
|
||
|
cs.options.SetResumeAfter(nil)
|
||
|
cs.options.SetStartAfter(nil)
|
||
|
cs.options.SetStartAtOperationTime(nil)
|
||
|
}
|
||
|
|
||
|
// ID returns the ID for this change stream, or 0 if the cursor has been closed or exhausted.
|
||
|
func (cs *ChangeStream) ID() int64 {
|
||
|
if cs.cursor == nil {
|
||
|
return 0
|
||
|
}
|
||
|
return cs.cursor.ID()
|
||
|
}
|
||
|
|
||
|
// Decode will unmarshal the current event document into val and return any errors from the unmarshalling process
|
||
|
// without any modification. If val is nil or is a typed nil, an error will be returned.
|
||
|
func (cs *ChangeStream) Decode(val interface{}) error {
|
||
|
if cs.cursor == nil {
|
||
|
return ErrNilCursor
|
||
|
}
|
||
|
|
||
|
return bson.UnmarshalWithRegistry(cs.registry, cs.Current, val)
|
||
|
}
|
||
|
|
||
|
// Err returns the last error seen by the change stream, or nil if no errors has occurred.
|
||
|
func (cs *ChangeStream) Err() error {
|
||
|
if cs.err != nil {
|
||
|
return replaceErrors(cs.err)
|
||
|
}
|
||
|
if cs.cursor == nil {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
return replaceErrors(cs.cursor.Err())
|
||
|
}
|
||
|
|
||
|
// Close closes this change stream and the underlying cursor. Next and TryNext must not be called after Close has been
|
||
|
// called. Close is idempotent. After the first call, any subsequent calls will not change the state.
|
||
|
func (cs *ChangeStream) Close(ctx context.Context) error {
|
||
|
if ctx == nil {
|
||
|
ctx = context.Background()
|
||
|
}
|
||
|
|
||
|
defer closeImplicitSession(cs.sess)
|
||
|
|
||
|
if cs.cursor == nil {
|
||
|
return nil // cursor is already closed
|
||
|
}
|
||
|
|
||
|
cs.err = replaceErrors(cs.cursor.Close(ctx))
|
||
|
cs.cursor = nil
|
||
|
return cs.Err()
|
||
|
}
|
||
|
|
||
|
// ResumeToken returns the last cached resume token for this change stream, or nil if a resume token has not been
|
||
|
// stored.
|
||
|
func (cs *ChangeStream) ResumeToken() bson.Raw {
|
||
|
return cs.resumeToken
|
||
|
}
|
||
|
|
||
|
// Next gets the next event for this change stream. It returns true if there were no errors and the next event document
|
||
|
// is available.
|
||
|
//
|
||
|
// Next blocks until an event is available, an error occurs, or ctx expires. If ctx expires, the error
|
||
|
// will be set to ctx.Err(). In an error case, Next will return false.
|
||
|
//
|
||
|
// If Next returns false, subsequent calls will also return false.
|
||
|
func (cs *ChangeStream) Next(ctx context.Context) bool {
|
||
|
return cs.next(ctx, false)
|
||
|
}
|
||
|
|
||
|
// TryNext attempts to get the next event for this change stream. It returns true if there were no errors and the next
|
||
|
// event document is available.
|
||
|
//
|
||
|
// TryNext returns false if the change stream is closed by the server, an error occurs when getting changes from the
|
||
|
// server, the next change is not yet available, or ctx expires. If ctx expires, the error will be set to ctx.Err().
|
||
|
//
|
||
|
// If TryNext returns false and an error occurred or the change stream was closed
|
||
|
// (i.e. cs.Err() != nil || cs.ID() == 0), subsequent attempts will also return false. Otherwise, it is safe to call
|
||
|
// TryNext again until a change is available.
|
||
|
//
|
||
|
// This method requires driver version >= 1.2.0.
|
||
|
func (cs *ChangeStream) TryNext(ctx context.Context) bool {
|
||
|
return cs.next(ctx, true)
|
||
|
}
|
||
|
|
||
|
func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool {
|
||
|
// return false right away if the change stream has already errored or if cursor is closed.
|
||
|
if cs.err != nil {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
if ctx == nil {
|
||
|
ctx = context.Background()
|
||
|
}
|
||
|
|
||
|
if len(cs.batch) == 0 {
|
||
|
cs.loopNext(ctx, nonBlocking)
|
||
|
if cs.err != nil {
|
||
|
cs.err = replaceErrors(cs.err)
|
||
|
return false
|
||
|
}
|
||
|
if len(cs.batch) == 0 {
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// successfully got non-empty batch
|
||
|
cs.Current = bson.Raw(cs.batch[0])
|
||
|
cs.batch = cs.batch[1:]
|
||
|
if cs.err = cs.storeResumeToken(); cs.err != nil {
|
||
|
return false
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) {
|
||
|
for {
|
||
|
if cs.cursor == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if cs.cursor.Next(ctx) {
|
||
|
// non-empty batch returned
|
||
|
cs.batch, cs.err = cs.cursor.Batch().Documents()
|
||
|
return
|
||
|
}
|
||
|
|
||
|
cs.err = replaceErrors(cs.cursor.Err())
|
||
|
if cs.err == nil {
|
||
|
// Check if cursor is alive
|
||
|
if cs.ID() == 0 {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// If a getMore was done but the batch was empty, the batch cursor will return false with no error.
|
||
|
// Update the tracked resume token to catch the post batch resume token from the server response.
|
||
|
cs.updatePbrtFromCommand()
|
||
|
if nonBlocking {
|
||
|
// stop after a successful getMore, even though the batch was empty
|
||
|
return
|
||
|
}
|
||
|
continue // loop getMore until a non-empty batch is returned or an error occurs
|
||
|
}
|
||
|
|
||
|
if !cs.isResumableError() {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// ignore error from cursor close because if the cursor is deleted or errors we tried to close it and will remake and try to get next batch
|
||
|
_ = cs.cursor.Close(ctx)
|
||
|
if cs.err = cs.executeOperation(ctx, true); cs.err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (cs *ChangeStream) isResumableError() bool {
|
||
|
commandErr, ok := cs.err.(CommandError)
|
||
|
if !ok || commandErr.HasErrorLabel(networkErrorLabel) {
|
||
|
// All non-server errors or network errors are resumable.
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
if commandErr.Code == errorCursorNotFound {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// For wire versions 9 and above, a server error is resumable if it has the ResumableChangeStreamError label.
|
||
|
if cs.wireVersion != nil && cs.wireVersion.Includes(minResumableLabelWireVersion) {
|
||
|
return commandErr.HasErrorLabel(resumableErrorLabel)
|
||
|
}
|
||
|
|
||
|
// For wire versions below 9, a server error is resumable if its code is on the allowlist.
|
||
|
_, resumable := resumableChangeStreamErrors[commandErr.Code]
|
||
|
return resumable
|
||
|
}
|
||
|
|
||
|
// Returns true if the underlying cursor's batch is empty
|
||
|
func (cs *ChangeStream) emptyBatch() bool {
|
||
|
return cs.cursor.Batch().Empty()
|
||
|
}
|
||
|
|
||
|
// StreamType represents the cluster type against which a ChangeStream was created.
|
||
|
type StreamType uint8
|
||
|
|
||
|
// These constants represent valid change stream types. A change stream can be initialized over a collection, all
|
||
|
// collections in a database, or over a cluster.
|
||
|
const (
|
||
|
CollectionStream StreamType = iota
|
||
|
DatabaseStream
|
||
|
ClientStream
|
||
|
)
|