rwynn / gtm Goto Github PK
View Code? Open in Web Editor NEWgtm (go tail mongo) is a MongoDB event listener
License: MIT License
gtm (go tail mongo) is a MongoDB event listener
License: MIT License
i,m trying to use gtm to sync two mongo instances.
first instance has been running for a while and has roughly 50GB of data.
my question is if i do a direct read and start writing to the secondary database what can i do about the changes made to the primary database while i,m reading the data since it can change after i read it
my own idea is to store the op-log while doing the direct read and then apply it to the second database.
i understand that there are already tools for this but this is part of a bigger project and i need to achieve this via code.
any better options?
Would be great to have some tests in place so there's some confidence that this stays working. Would also likely improve the code structure.
Only default BufferDuration if zero instead of less than one.
This is more of a question and not an issue:
I'm trying to figure out what we should do when we get an error. I don't want to skip oplog messages.
Some initial ideas:
Your thoughts would be appreciated. Basically, to ensure handling of all oplog messages, how would we deal with ctx.ErrC events?
Thanks,
Mike
Incorporate original recommendation from @zph in #3, but with the addition of a setting to control the ordering semantics of events on the output channel.
Based on performance numbers gathered by @zph at http://data.xargs.io/20170213165151-performance.html
Just spent a lot of time battling with bson.mongotimestamp and still can't figure out how to do this
When keying operations in a golang map, keys must include namespace AND _id because mongodb _id is not guaranteed unique across collections.
Direct reads seem to be running off of one cursor, using gte
query+sort pagination on _id
.
This can be pretty inefficient for sharded clusters/collections. For such collections, it's faster to query on chunk ranges and in parallel. The mongo-hadoop connector does this here.
I imagine that the same applies to non-sharded collections too - if the _id
range is "guess-chunked" well (i.e. approximately same number of documents per chunk range).
Usually, if I query all documents in say, a 10M document collection, fetching with a single cursor will be pretty slow despite none of the mongod
server resource usage (network, CPU, disk) even getting near to being maxed out. I think this is typical with TCP transfers - e.g. how HTTP "download accelerators" get 4x-6x the speed using multiple threads with different Range
headers, compared to a single thread.
For some perspective: I'm currently getting (on the monstache machine) ~2 Mbits/s average with ~23 MBit/s peak with DirectRead in monstache on the same local virtual network in Azure, compared to ~35 MBit/s average with 90 MBit/s peak in poorly written (bad chunking) multithreaded/multi-cursor .NET code reading from the same collection over the Internet.
If it's agreed that adding multi-cursor/threaded direct reads would be good, I'd be happy to submit a pull request, but am unsure how to best make it configurable (Do you make multi-cursor DirectRead optional? How do you specify the chunking method? etc).
Using this sha: 6cf4c1b
We observed that occasionally data ops would come through GTM channel that had an empty op.Data.
We traced it down the circumstances where two operations in oplog happened in very close proximity Timestamp(1489513643, 3) and Timestamp(1489513643, 5)
.
The code responsible was here:
Line 197 in 6cf4c1b
this.Entries
. By assigning based on Mongo results and keying off the id, it skipped the operation with Timestamp(1489513643, 3)
.
It is fixed in the newest commit:
Here: https://github.com/rwynn/gtm/blob/master/gtm.go#L218-L222 where the byId[resultId]
is now a slice instead of a single *Op.
So once I upgraded moresql to e400ebf, that issue resolved.
At that point, I needed to tweak moresql to make a copy of the op as it comes from gtm's tail channel, otherwise there are concurrent map read/write errors. It might be worth considering sending new copies of op.Data
for each result so that this is solved in GTM instead of implemented in client libs depending on GTM.
for _, o := range mapped {
data := make(map[string]interface{})
// Create copy in case of multiple ops using same mongo result
for k, v := range result {
data[k] = v
}
o.Data = data
}
That proposed change is represented here: #8.
Feel free to close this issue once you read through. Since it's mostly fixed in master, I wanted to leave a troubleshooting record for other folks who might come along.
We're using moresql (which uses this library) to send certain collections from a MongoDB server to Postgres. It seems like it would require fewer permissions (and possibly be more efficient) if there was a way to tell gtm to ignore changes to other collecitions.
In particular, it seems that gtm's Flush function is attempting to find all results from any oplog change to pass up to its user, and so when there's a change to system.sessions, we attempt to load that data and fail with a permissions error due to a lack of permissions on our hosted Mongo server.
As a quick fix, I just hardcoded an exclusion for the "system" db, but it would be nice if there were some more principled option for this.
After running for a bit (between a few seconds and a couple minutes), moresql is dying with
FATA[0052] Exiting: Mongo tailer returned error not authorized on config to execute command { find: "system.sessions", filter: { _id: { $in: [ { id: UUID("09f2d949-ddbd-410f-9f3c-ac6cd5ed609b"), uid: BinData(0, A692C2C7CEBFE5DFCBBE61D42E17B4B0617EDE8A36FFCFD7B4FCE10C9D322C4B) }, [snip] ) } ] } }, skip: 0, $db: "config" }
We're using a hosted mongo solution that doesn't seem to allow us to grant read access on the system collections, so just giving the moresql user the relevant permissions isn't an option.
By adding a line to gtm's Flush function that skips any update on the config DB, I was able to suppress this error. It would be nice to have a more robust solution, though.
(This ticket largely copied from zph/moresql#15)
I would like to be able to unmarshal Op.Data into my own struct in the same way that bson does. I could convert the map to a struct myself but that seems error prone. I don't see a way to get access to the raw data to do it myself. Any thoughts?
Would you be willing to add a license?
Thank you.
I have tried to specify the After
property of gtm.option
but when I try to specify the After
property with a MongoTimestamp
, I have the following error :
(type bson.MongoTimestamp) as type gtm.TimestampGenerator in field value
Could you provide an example (usage of the field) and explain why you decided not to use the type bson.MongoTimestamp
?
Thanks
Delete
I just started very simple go project for changeStream with mongodb official golang package. I am using latest version of both packages. My go.mod files looks like
module mongos
go 1.13
require (
github.com/rwynn/gtm v2.0.6
go.mongodb.org/mongo-driver v1.3.4
)
When i try to build go build main.go
i get the following error message
go: errors parsing go.mod:
/Users/johnsmith/Desktop/Mongos/go.mod:6: require github.com/rwynn/gtm: version "v2.0.6" invalid: module contains a go.mod file, so major version must be compatible: should be v0 or v1, not v2
We found that the oplog of mongodb parsed by gtm lost the column order. If I want to incrementally import data to another mongodb, the order of columns is inconsistent with the source database. Is there any way to solve this?
this namespace is mongodb's collection and field ?
in this example, namespace == "users.users" ,namespace == "users.status", "users" is collection name? "status" is mongo document's field name?
func PipeBuilder(namespace string, changeStream bool) ([]interface{}, error) {
// to build your pipelines for change events you will want to reference
// the MongoDB reference for change events at
// https://docs.mongodb.com/manual/reference/change-events/
// you will only receive changeStream == true when you configure gtm with
// ChangeStreamNS (requies MongoDB 3.6+). You cannot build pipelines for
// changes using legacy direct oplog tailing
if namespace == "users.users" {
// given a set of docs like {username: "joe", email: "[email protected]", amount: 1}
if changeStream {
return []interface{}{
bson.M{"$match": bson.M{"fullDocument.username": "joe"}},
}, nil
} else {
return []interface{}{
bson.M{"$match": bson.M{"username": "joe"}},
}, nil
}
} else if namespace == "users.status" && changeStream {
// return a pipeline that only receives events when a document is
// inserted, deleted, or a specific field is changed. In this case
// only a change to field1 is processed. Changes to other fields
// do not match the pipeline query and thus you won't receive the event.
return []interface{}{
bson.M{"$match": bson.M{"$or": []interface{} {
bson.M{"updateDescription": bson.M{"$exists": false}},
bson.M{"updateDescription.updatedFields.field1": bson.M{"$exists": true}},
}}},
}, nil
}
return nil, nil
}
output.Drop = true
find it only can delete by the original document mongo["_id"],but i want to del by custom id
Hey, do you mind adding some version tags to allow explicit vgo dependency declaration on your repo? Like, say, a v1.0.0
for current master
? Thanks.
if i want resume-from-timestamp ,How should I configure it?
Mistakes happened when i migrates docs from one table to another.
source doc like {"A":[], "B":"name"}
result doc like {"B":"name"}
that A field was unset
when param "a" of func "normalizeDocSlice" was a zero length slice that not nil pointer. So the returned var "avs" was a nil.
i guess var avs = make([]interface{}, 0)
may solve my problem
type Op struct {
Id interface{} `json:"_id"`
Operation string `json:"operation"`
Namespace string `json:"namespace"`
Data map[string]interface{} `json:"data,omitempty"`
Timestamp primitive.Timestamp `json:"timestamp"`
Source QuerySource `json:"source"`
Doc interface{} `json:"doc,omitempty"`
UpdateDescription map[string]interface{} `json:"updateDescription,omitempty"`
ResumeToken OpResumeToken `json:"-"`
}
I want to get latest document that have been change. for example, in "lessons" mongo collections, I update "$unset" the "room_id" filed of "lessons" collections , I need the doc data without "room_id" field.
I shouled select Op.Data and Op.Doc ? Op.Doc is value of Op.Data field "Doc" ?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.