Coder Social home page Coder Social logo

gtm's Introduction

gtm

gtm (go tail mongo) is a utility written in Go which tails the MongoDB oplog and sends create, update, delete events to your code. It can be used to send emails to new users, index documents, write time series data, or something else.

This branch is a port of the original gtm to use the new official golang driver from MongoDB. The original gtm uses the community mgo driver. To use the community mgo driver use the legacy branch.

Requirements

Installation

go get github.com/rwynn/gtm/v2

Setup

gtm uses the MongoDB oplog as an event source. You will need to ensure that MongoDB is configured to produce an oplog by deploying a replica set.

If you haven't already done so, follow the 5 step procedure to initiate and validate your replica set. For local testing your replica set may contain a single member.

Usage

package main

import (
	"context"
	"fmt"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/bsontype"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
	"github.com/rwynn/gtm/v2"
	"reflect"
	"time"
)

func main() {
	rb := bson.NewRegistryBuilder()
	//rb.RegisterTypeMapEntry(bsontype.Timestamp, reflect.TypeOf(time.Time{}))
	rb.RegisterTypeMapEntry(bsontype.DateTime, reflect.TypeOf(time.Time{}))
	reg := rb.Build()
	clientOptions := options.Client()
	clientOptions.SetRegistry(reg)
	clientOptions.ApplyURI("mongodb://localhost:27017")
	client, err := mongo.NewClient(clientOptions)
	if err != nil {
		panic(err)
	}
	ctxm, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
	err = client.Connect(ctxm)
	if err != nil {
		panic(err)
	}
	defer client.Disconnect(context.Background())
	ctx := gtm.Start(client, &gtm.Options{
		DirectReadNs: []string{"test.test"},
		ChangeStreamNs: []string{"test.test"},
		MaxWaitSecs: 10,
		OpLogDisabled: true,
	})
	for {
		select {
		case err := <-ctx.ErrC:
			fmt.Printf("got err %+v", err)
			break
		case op := <-ctx.OpC:
			fmt.Printf("got op %+v", op)
			break
		}
	}
}

Configuration

func PipeBuilder(namespace string, changeStream bool) ([]interface{}, error) {

	// to build your pipelines for change events you will want to reference
	// the MongoDB reference for change events at 
	// https://docs.mongodb.com/manual/reference/change-events/

	// you will only receive changeStream == true when you configure gtm with
	// ChangeStreamNS (requies MongoDB 3.6+).  You cannot build pipelines for
	// changes using legacy direct oplog tailing

	if namespace == "users.users" {
		// given a set of docs like {username: "joe", email: "[email protected]", amount: 1}
		if changeStream {
			return []interface{}{
				bson.M{"$match": bson.M{"fullDocument.username": "joe"}},
			}, nil
		} else {
			return []interface{}{
				bson.M{"$match": bson.M{"username": "joe"}},
			}, nil
		}
	} else if namespace == "users.status" && changeStream {
		// return a pipeline that only receives events when a document is 
		// inserted, deleted, or a specific field is changed. In this case
		// only a change to field1 is processed.  Changes to other fields
		// do not match the pipeline query and thus you won't receive the event.
		return []interface{}{
			bson.M{"$match": bson.M{"$or": []interface{} {
				bson.M{"updateDescription": bson.M{"$exists": false}},
				bson.M{"updateDescription.updatedFields.field1": bson.M{"$exists": true}},
			}}},
		}, nil
	}
	return nil, nil
}

func NewUsers(op *gtm.Op) bool {
	return op.Namespace == "users.users" && op.IsInsert()
}

// if you want to listen only for certain events on certain collections
// pass a filter function in options
ctx := gtm.Start(client, &gtm.Options{
	NamespaceFilter: NewUsers, // only receive inserts in the user collection
})
// more options are available for tuning
ctx := gtm.Start(client, &gtm.Options{
	NamespaceFilter      nil,           // op filter function that has access to type/ns ONLY
	Filter               nil,           // op filter function that has access to type/ns/data
	After:               nil,     	    // if nil defaults to gtm.LastOpTimestamp; not yet supported for ChangeStreamNS
	OpLogDisabled:       false,         // true to disable tailing the MongoDB oplog
	OpLogDatabaseName:   nil,     	    // defaults to "local"
	OpLogCollectionName: nil,     	    // defaults to "oplog.rs"
	ChannelSize:         0,       	    // defaults to 20
	BufferSize:          25,            // defaults to 50. used to batch fetch documents on bursts of activity
	BufferDuration:      0,             // defaults to 750 ms. after this timeout the batch is force fetched
	WorkerCount:         8,             // defaults to 1. number of go routines batch fetching concurrently
	Ordering:            gtm.Document,  // defaults to gtm.Oplog. ordering guarantee of events on the output channel as compared to the oplog
	UpdateDataAsDelta:   false,         // set to true to only receive delta information in the Data field on updates (info straight from oplog)
	DirectReadNs:        []string{"db.users"}, // set to a slice of namespaces (collections or views) to read data directly from
	DirectReadSplitMax:  9,             // the max number of times to split a collection for concurrent reads (impacts memory consumption)
	Pipe:                PipeBuilder,   // an optional function to build aggregation pipelines
	PipeAllowDisk:       false,         // true to allow MongoDB to use disk for aggregation pipeline options with large result sets
	Log:                 myLogger,      // pass your own logger
	ChangeStreamNs       []string{"db.col1", "db.col2"}, // MongoDB 3.6+ only; set to a slice to namespaces to read via MongoDB change streams
})

Direct Reads

If, in addition to tailing the oplog, you would like to also read entire collections you can set the DirectReadNs field to a slice of MongoDB namespaces. Documents from these collections will be read directly and output on the ctx.OpC channel.

You can wait till all the collections have been fully read by using the DirectReadWg wait group on the ctx.

go func() {
	ctx.DirectReadWg.Wait()
	fmt.Println("direct reads are done")
}()

Pause, Resume, Since, and Stop

You can pause, resume, or seek to a timestamp from the oplog. These methods effect only change events and not direct reads.

go func() {
	ctx.Pause()
	time.Sleep(time.Duration(2) * time.Minute)
	ctx.Resume()
	ctx.Since(previousTimestamp)
}()

You can stop all goroutines created by Start or StartMulti. You cannot resume a context once it has been stopped. You would need to create a new one.

go func() {
	ctx.Stop()
	fmt.Println("all go routines are stopped")
}

Custom Unmarshalling

If you'd like to unmarshall MongoDB documents into your own struct instead of the document getting unmarshalled to a generic map[string]interface{} you can use a custom unmarshal function:

type MyDoc struct {
	Id interface{} "_id"
	Foo string "foo"
}

func custom(namespace string, data []byte) (interface{}, error) {
	// use namespace, e.g. db.col, to map to a custom struct
	if namespace == "test.test" {
		var doc MyDoc
		if err := bson.Unmarshal(data, &doc); err == nil {
			return doc, nil
		} else {
			return nil, err
		}
	}
	return nil, errors.New("unsupported namespace")
}

ctx := gtm.Start(client, &gtm.Options{
	Unmarshal: custom,
}

for {
	select {
	case op:= <-ctx.OpC:
		if op.Namespace == "test.test" {
			doc := op.Doc.(MyDoc)
			fmt.Println(doc.Foo)
		}
	}
}

Workers

You may want to distribute event handling between a set of worker processes on different machines. To do this you can leverage the github.com/rwynn/gtm/consistent package.

Create a TOML document containing a list of all the event handlers.

Workers = [ "Tom", "Dick", "Harry" ] 

Create a consistent filter to distribute the work between Tom, Dick, and Harry. A consistent filter needs to acces the Data attribute of each op so it needs to be set as a Filter as opposed to a NamespaceFilter.

name := flag.String("name", "", "the name of this worker")
flag.Parse()
filter, filterErr := consistent.ConsistentHashFilterFromFile(*name, "/path/to/toml")
if filterErr != nil {
	panic(filterErr)
}

// there is also a method **consistent.ConsistentHashFilterFromDocument** which allows
// you to pass a Mongo document representing the config if you would like to avoid
// copying the same config file to multiple servers

Pass the filter into the options when calling gtm.Tail

ctx := gtm.Start(client, &gtm.Options{Filter: filter})

If you have your multiple filters you can use the gtm utility method ChainOpFilters

func ChainOpFilters(filters ...OpFilter) OpFilter

gtm's People

Contributors

antonsergeyev avatar rwynn avatar v1c77 avatar zph avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

gtm's Issues

How to specify the After option ?

I have tried to specify the After property of gtm.option but when I try to specify the After property with a MongoTimestamp, I have the following error :

(type bson.MongoTimestamp) as type gtm.TimestampGenerator in field value

Could you provide an example (usage of the field) and explain why you decided not to use the type bson.MongoTimestamp ?

Thanks

Unmarshal Op.Data to a struct instead of a map

I would like to be able to unmarshal Op.Data into my own struct in the same way that bson does. I could convert the map to a struct myself but that seems error prone. I don't see a way to get access to the raw data to do it myself. Any thoughts?

Multithread direct reads

Direct reads seem to be running off of one cursor, using gte query+sort pagination on _id.

This can be pretty inefficient for sharded clusters/collections. For such collections, it's faster to query on chunk ranges and in parallel. The mongo-hadoop connector does this here.

I imagine that the same applies to non-sharded collections too - if the _id range is "guess-chunked" well (i.e. approximately same number of documents per chunk range).

Usually, if I query all documents in say, a 10M document collection, fetching with a single cursor will be pretty slow despite none of the mongod server resource usage (network, CPU, disk) even getting near to being maxed out. I think this is typical with TCP transfers - e.g. how HTTP "download accelerators" get 4x-6x the speed using multiple threads with different Range headers, compared to a single thread.

For some perspective: I'm currently getting (on the monstache machine) ~2 Mbits/s average with ~23 MBit/s peak with DirectRead in monstache on the same local virtual network in Azure, compared to ~35 MBit/s average with 90 MBit/s peak in poorly written (bad chunking) multithreaded/multi-cursor .NET code reading from the same collection over the Internet.

If it's agreed that adding multi-cursor/threaded direct reads would be good, I'd be happy to submit a pull request, but am unsure how to best make it configurable (Do you make multi-cursor DirectRead optional? How do you specify the chunking method? etc).

What is the difference between Op.Data and Op.Doc ?

type Op struct {
	Id                interface{}            `json:"_id"`
	Operation         string                 `json:"operation"`
	Namespace         string                 `json:"namespace"`
	Data              map[string]interface{} `json:"data,omitempty"`
	Timestamp         primitive.Timestamp    `json:"timestamp"`
	Source            QuerySource            `json:"source"`
	Doc               interface{}            `json:"doc,omitempty"`
	UpdateDescription map[string]interface{} `json:"updateDescription,omitempty"`
	ResumeToken       OpResumeToken          `json:"-"`
}

I want to get latest document that have been change. for example, in "lessons" mongo collections, I update "$unset" the "room_id" filed of "lessons" collections , I need the doc data without "room_id" field.
I shouled select Op.Data and Op.Doc ? Op.Doc is value of Op.Data field "Doc" ?

What to do when ctx.ErrC channel provides an error?

This is more of a question and not an issue:

I'm trying to figure out what we should do when we get an error. I don't want to skip oplog messages.

Some initial ideas:

  1. We keep going thinking that gtm will handle and we won't skip oplog records
  2. Stop (ctx.Stop() ) and restart with gtm.Start using the offset we have been saving when we get a success handling of ctx.OpC

Your thoughts would be appreciated. Basically, to ensure handling of all oplog messages, how would we deal with ctx.ErrC events?

Thanks,
Mike

add some tests

Would be great to have some tests in place so there's some confidence that this stays working. Would also likely improve the code structure.

Missing op.Data when alterations to same ns + id happen within same Timestamp Epoch second

Using this sha: 6cf4c1b

We observed that occasionally data ops would come through GTM channel that had an empty op.Data.

We traced it down the circumstances where two operations in oplog happened in very close proximity Timestamp(1489513643, 3) and Timestamp(1489513643, 5).

The code responsible was here:

gtm/gtm.go

Line 197 in 6cf4c1b

for _, result := range results {
where it was assumed that there's a 1-1 mapping between results from Mongo and this.Entries. By assigning based on Mongo results and keying off the id, it skipped the operation with Timestamp(1489513643, 3).

It is fixed in the newest commit:
Here: https://github.com/rwynn/gtm/blob/master/gtm.go#L218-L222 where the byId[resultId] is now a slice instead of a single *Op.

So once I upgraded moresql to e400ebf, that issue resolved.

At that point, I needed to tweak moresql to make a copy of the op as it comes from gtm's tail channel, otherwise there are concurrent map read/write errors. It might be worth considering sending new copies of op.Data for each result so that this is solved in GTM instead of implemented in client libs depending on GTM.

for _, o := range mapped {
	data := make(map[string]interface{})
	// Create copy in case of multiple ops using same mongo result
	for k, v := range result {
		data[k] = v
	}
	o.Data = data
}

That proposed change is represented here: #8.

Feel free to close this issue once you read through. Since it's mostly fixed in master, I wanted to leave a troubleshooting record for other folks who might come along.

unset slice occur

Mistakes happened when i migrates docs from one table to another.
source doc like {"A":[], "B":"name"}
result doc like {"B":"name"}
that A field was unset
when param "a" of func "normalizeDocSlice" was a zero length slice that not nil pointer. So the returned var "avs" was a nil.

i guess var avs = make([]interface{}, 0) may solve my problem

how to use gtm to sync to a mongo primary

i,m trying to use gtm to sync two mongo instances.
first instance has been running for a while and has roughly 50GB of data.
my question is if i do a direct read and start writing to the secondary database what can i do about the changes made to the primary database while i,m reading the data since it can change after i read it
my own idea is to store the op-log while doing the direct read and then apply it to the second database.
i understand that there are already tools for this but this is part of a bigger project and i need to achieve this via code.
any better options?

column order problem

We found that the oplog of mongodb parsed by gtm lost the column order. If I want to incrementally import data to another mongodb, the order of columns is inconsistent with the source database. Is there any way to solve this?

Limit object reads to specific databases/collections

We're using moresql (which uses this library) to send certain collections from a MongoDB server to Postgres. It seems like it would require fewer permissions (and possibly be more efficient) if there was a way to tell gtm to ignore changes to other collecitions.

In particular, it seems that gtm's Flush function is attempting to find all results from any oplog change to pass up to its user, and so when there's a change to system.sessions, we attempt to load that data and fail with a permissions error due to a lack of permissions on our hosted Mongo server.

As a quick fix, I just hardcoded an exclusion for the "system" db, but it would be nice if there were some more principled option for this.

Additional Details

After running for a bit (between a few seconds and a couple minutes), moresql is dying with

FATA[0052] Exiting: Mongo tailer returned error not authorized on config to execute command { find: "system.sessions", filter: { _id: { $in: [ { id: UUID("09f2d949-ddbd-410f-9f3c-ac6cd5ed609b"), uid: BinData(0, A692C2C7CEBFE5DFCBBE61D42E17B4B0617EDE8A36FFCFD7B4FCE10C9D322C4B) }, [snip] ) } ] } }, skip: 0, $db: "config" }

We're using a hosted mongo solution that doesn't seem to allow us to grant read access on the system collections, so just giving the moresql user the relevant permissions isn't an option.

By adding a line to gtm's Flush function that skips any update on the config DB, I was able to suppress this error. It would be nice to have a more robust solution, though.

(This ticket largely copied from zph/moresql#15)

Error importing v2.0.6 with new official mongo golang package

I just started very simple go project for changeStream with mongodb official golang package. I am using latest version of both packages. My go.mod files looks like

module mongos

go 1.13

require (
	github.com/rwynn/gtm v2.0.6
	go.mongodb.org/mongo-driver v1.3.4
)

When i try to build go build main.go i get the following error message

go: errors parsing go.mod:
/Users/johnsmith/Desktop/Mongos/go.mod:6: require github.com/rwynn/gtm: version "v2.0.6" invalid: module contains a go.mod file, so major version must be compatible: should be v0 or v1, not v2

Add semver tags

Hey, do you mind adding some version tags to allow explicit vgo dependency declaration on your repo? Like, say, a v1.0.0 for current master? Thanks.

License?

Would you be willing to add a license?

Thank you.

what’s the namespace ?

this namespace is mongodb's collection and field ?
in this example, namespace == "users.users" ,namespace == "users.status", "users" is collection name? "status" is mongo document's field name?

func PipeBuilder(namespace string, changeStream bool) ([]interface{}, error) {

	// to build your pipelines for change events you will want to reference
	// the MongoDB reference for change events at 
	// https://docs.mongodb.com/manual/reference/change-events/

	// you will only receive changeStream == true when you configure gtm with
	// ChangeStreamNS (requies MongoDB 3.6+).  You cannot build pipelines for
	// changes using legacy direct oplog tailing

	if namespace == "users.users" {
		// given a set of docs like {username: "joe", email: "[email protected]", amount: 1}
		if changeStream {
			return []interface{}{
				bson.M{"$match": bson.M{"fullDocument.username": "joe"}},
			}, nil
		} else {
			return []interface{}{
				bson.M{"$match": bson.M{"username": "joe"}},
			}, nil
		}
	} else if namespace == "users.status" && changeStream {
		// return a pipeline that only receives events when a document is 
		// inserted, deleted, or a specific field is changed. In this case
		// only a change to field1 is processed.  Changes to other fields
		// do not match the pipeline query and thus you won't receive the event.
		return []interface{}{
			bson.M{"$match": bson.M{"$or": []interface{} {
				bson.M{"updateDescription": bson.M{"$exists": false}},
				bson.M{"updateDescription.updatedFields.field1": bson.M{"$exists": true}},
			}}},
		}, nil
	}
	return nil, nil
}

Implement Bulk Fetching

Implement bulk fetching to approach or match the throughput optimization of PR #3 by @zph, while at the same time maintaining the ordering semantics of the oplog.

gtm.zip

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.