Coder Social home page Coder Social logo

Comments (21)

rwynn avatar rwynn commented on May 26, 2024 1


I think I found the mistake on my part. It was silly. The condition for the for loop should be

for !errsDone || !opsDone {

Notice the || instead of &&. I updated the code sample in this thread.

I think gtm is OK and this was just an error in usage.

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

i also have another issue with direct read
here is how i start it

			ctx := gtm.Start(client, &gtm.Options{
				DirectReadNs: []string{config.SourceDB + "." + config.SourceCollection},
			go printOp(ctx)

in my printOp go routine i simply use the code you have as an example to print operations

    for {

		select {

		case op = <-ctx.OpC:
			count = count + 1

		case err = <-ctx.ErrC:
			fmt.Println("got error %+v", err)




i expect this code to print until the collection is fully received then since the channel is nil the default case will run and the routine will exit however the routine hits the default case immediately
i tried to add 1 to the waitGroup with ADD() function and then calling the Done() function after routine is done.
if i remove the default case the for loop will go on forever
how can i access the collection data in direct read mode and then continue with the rest of the code?

from gtm.

rwynn avatar rwynn commented on May 26, 2024

Hopefully this will help...


package main

import (


func readContext(ctx *gtm.OpCtx, done chan bool) {
	var errsDone, opsDone bool
	var opCnt, errCnt int
	for !errsDone || !opsDone {
		select {
		case op, open := <-ctx.OpC:
			if op == nil && !open {
				opsDone = true
		case err, open := <-ctx.ErrC:
			if err == nil && !open {
				errsDone = true
	fmt.Printf("Processed %d ops and %d errs\n", opCnt, errCnt)

func main() {
	var mongoURL string
	flag.StringVar(&mongoURL, "url", "mongodb://localhost:27017", "MongoDB connection URL")
	log.Printf("Connecting to MongoDB at %s", mongoURL)
	client, err := mongo.NewClient(options.Client().ApplyURI(mongoURL))
	if err != nil {
	if err = client.Connect(context.Background()); err != nil {
	defer client.Disconnect(context.Background())

	done := make(chan bool)
	gtmCtx := gtm.Start(client, &gtm.Options{
		DirectReadNs:   []string{"test.test"},
		ChangeStreamNs: []string{"test.test"},
		MaxAwaitTime:   time.Duration(10) * time.Second,
		OpLogDisabled:  true,
	go readContext(gtmCtx, done)
	fmt.Println("All done")


module test

go 1.12

require ( v1.4.4 // indirect v1.8.0 // indirect v0.0.1 // indirect v0.4.0 // indirect v1.0.0 // indirect v0.9.1 // indirect v1.0.1-0.20191119151623-081995b34c9c v0.0.0-20190515033939-7706f26af194 // indirect v1.4.0 // indirect v1.0.0 // indirect v0.0.0-20180814205039-7eeb5667e42c // indirect v1.0.0 // indirect v1.2.1 v0.0.0-20200128174031-69ecbb4d6d5d // indirect v0.0.0-20190911185100-cd5d95a43a6e // indirect v0.3.2 // indirect

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

thanks very much
one question though:
i noticed you have both ChangeStreamNs and DirectReadNs enabled also the OpLogDisabled is true
would you please clarify whether this code just reads the collection or does it also read the changes applied to it while direct read is happening.
it would be great if the Options struct had some comments

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

i ran the code and saw something interesting
if i print the op and err the number of processed operations and errs changes
if i don't print the op and err and just count them (like your code) it has the correct number of ops
otherwise it seems like a couple of thousands of ops are not counted
i,m doing direct read on a collection with about 3 million records
the database is not under any load so no fields change during the read

with operation print:

     Processed 2916557 ops and 0 errs
     All done

without operation print:

    Processed 2918601 ops and 0 errs
    All done

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

i did a bit more investigation
consider the code below

     fmt.Println("direct read done")

after the direct read is done you have closed the ctx which i think kills the channel and causes the
readContext function to close but when i print the operations i can see the function printing operations even after the fmt.Println("direct read done") is executed which means the ctx is being closed prematurely.
any thoughts?

from gtm.

rwynn avatar rwynn commented on May 26, 2024

I think it may be related to the use of buffered channels in gtm.

Try updating the readContext function I posted and give the name op to the 2 _ vars. Then only set the 2 done flags if...

op == nil && !open

Due to buffering I think that the channel is closed but still has data on it. When the channel is empty the zero value of nil for a pointer will start getting returned.

from gtm.

rwynn avatar rwynn commented on May 26, 2024

Also to your previous question. With Change stream NS set, yes you will also receive changes to the collections that are happening. There are methods to check if the event source is direct read or change event. Make that an empty slice if you don’t want changes.

You will probably want OplogDisabled always set to true. This project is old and thus supports reading directly from the oplog. But you don’t want to do that anymore since change streams.

from gtm.

rwynn avatar rwynn commented on May 26, 2024

i updated my original posted code to account for the buffering.

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

i tried the new code and just added fmt.Println(op) and fmt.Println(err) but still i lose some operations

    Processed 2916562 ops and 0 errs
    All done

it should have 2918601 ops
also change stream NS was set to empty string since database has no operations at the moment and simply has 2918601 inserts in it (backup restore)

from gtm.

rwynn avatar rwynn commented on May 26, 2024

Can you post the full code that does not work? Also, does it work with or without the Println?

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024
package main

import (


func readContext(ctx *gtm.OpCtx, done chan bool) {
        var errsDone, opsDone bool
        var opCnt, errCnt int
        for !errsDone && !opsDone {
                select {
                case op, open := <-ctx.OpC:
                        if op == nil && !open {
                                opsDone = true
                case err, open := <-ctx.ErrC:
                        if err == nil && !open {
                                errsDone = true
        fmt.Printf("Processed %d ops and %d errs\n", opCnt, errCnt)

func main() {
        var mongoURL string
        flag.StringVar(&mongoURL, "url", "mongodb://", "MongoDB connection URL")
        log.Printf("Connecting to MongoDB at %s", mongoURL)
        client, err := mongo.NewClient(options.Client().ApplyURI(mongoURL))
        if err != nil {
        if err = client.Connect(context.Background()); err != nil {
        defer client.Disconnect(context.Background())

        done := make(chan bool)
        gtmCtx := gtm.Start(client, &gtm.Options{
                DirectReadNs:   []string{"record_stats.answerlogs"},
                ChangeStreamNs: []string{},
                MaxAwaitTime:   time.Duration(10) * time.Second,
                OpLogDisabled:  true,
        go readContext(gtmCtx, done)
        fmt.Println("Direct read done")
        fmt.Println("All done")

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024
Direct read done
Processed 2918601 ops and 0 errs
All done

works correctly without the print
so did the previous code

from gtm.

rwynn avatar rwynn commented on May 26, 2024

Even when I add the Println like you have I still come up with the same number, in my case a collection of size 4000012. Using go version 1.13.4 linux/amd64 here.

That's really a mystery to me why that Print would be affecting the count.

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

i,m using 1.13.1 on centos7
i will try with 1.13.4 and see what happens
thank you

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

i tried with go 1.13.4 but no luck i also used another server with more resources to make sure there are no bottlenecks there
when i run the code i posted the text Direct read done is printed but there are operations left (still printing) before the main routine exits

Direct read done
map[_id:ObjectID("5e2794e980ea13a924dd989c") answer:-1 created_at: game_id: hint_used: question_id: question_number:7 state:NOT_ANSWERED user_id: zone_number:1]
map[_id:ObjectID("5e2794e980ea13a924dd989d") answer:-1 created_at:game_id: hint_used: question_id: question_number:8 state:NOT_ANSWERED user_id: zone_number:1]
map[_id:ObjectID("5e2495c99dae954a6cb4ea4a") answer:-1 created_at:game_id: hint_used: question_id: question_number:13 state:NOT_ANSWERED user_id: zone_number:1]
Processed 2916590 ops and 0 errs
All done

is it the same for you?

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

on the new server running without the print returns different results (some correct and some not correct)
looks like the problem is not with the code rather my distribution or hardware
would you please tell me your linux distro for the mongo server and mongo version you are using
also information about the hardware would be useful
i am using mongo 4.2.3 on centos7
server has 8gb ram and 8 cpu cores using ssd disk

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

here is my mongo logs . looks like latency issues

2020-02-06T14:10:26.264-0500 I  NETWORK  [listener] connection accepted from #74 (1 connection now open)
2020-02-06T14:10:26.265-0500 I  NETWORK  [conn74] received client metadata from conn74: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:26.266-0500 I  NETWORK  [listener] connection accepted from #75 (2 connections now open)
2020-02-06T14:10:26.266-0500 I  NETWORK  [conn75] received client metadata from conn75: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:26.511-0500 I  COMMAND  [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: {} }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:051BBA2A planCacheKey:051BBA2A reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 238ms
2020-02-06T14:10:26.512-0500 I  NETWORK  [listener] connection accepted from #76 (3 connections now open)
2020-02-06T14:10:26.513-0500 I  NETWORK  [conn76] received client metadata from conn76: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:26.808-0500 I  COMMAND  [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5dd451fdf2b4ae5988416dfd') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 295ms
2020-02-06T14:10:26.809-0500 I  NETWORK  [listener] connection accepted from #77 (4 connections now open)
2020-02-06T14:10:26.809-0500 I  NETWORK  [conn77] received client metadata from conn77: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:27.106-0500 I  COMMAND  [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5dd999b8cef705b56f075a57') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 297ms
2020-02-06T14:10:27.107-0500 I  NETWORK  [listener] connection accepted from #78 (5 connections now open)
2020-02-06T14:10:27.107-0500 I  NETWORK  [conn78] received client metadata from conn78: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:27.414-0500 I  COMMAND  [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5de108bf984e0f8957c747e0') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 307ms
2020-02-06T14:10:27.418-0500 I  NETWORK  [listener] connection accepted from #79 (6 connections now open)
2020-02-06T14:10:27.418-0500 I  NETWORK  [conn79] received client metadata from conn79: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:27.737-0500 I  COMMAND  [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5dfc9f870ff0bbb61395759a') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 321ms
2020-02-06T14:10:28.076-0500 I  COMMAND  [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e0dec7411714ed41f504082') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 338ms
2020-02-06T14:10:28.472-0500 I  COMMAND  [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e16361af67e1200a5f8eab2') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 395ms
2020-02-06T14:10:28.913-0500 I  COMMAND  [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e1cbfe7d01063c2f09edccd') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 440ms
2020-02-06T14:10:29.485-0500 I  COMMAND  [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e217712c7b942a926c26db7') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 561ms
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn79] end connection (5 connections now open)
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn75] end connection (3 connections now open)
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn78] end connection (1 connection now open)
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn74] end connection (0 connections now open)
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn76] end connection (2 connections now open)
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn77] end connection (4 connections now open)

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

i ran the code couple more times on both servers
looks like i was wrong even on the slower server just printing the count can produce different results but it happens less often
slower server is running mongo 4.2.1
this is probably a matter of hardware and mongo version

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

looks like that was it
testing with count and print works fine

from gtm.

mosesBD avatar mosesBD commented on May 26, 2024

i checked a couple more times and it is working :)
how do i tell whether the operation i am receiving is coming from DirectReadNs or ChangeStreamNs

from gtm.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.