Coder Social home page Coder Social logo

kinsumer's People

Contributors

brentnd avatar dcelasun avatar garethlewin avatar matwalsh avatar mjperrone avatar slydon avatar sytten avatar teamdoug avatar trane9991 avatar willsewell avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kinsumer's Issues

Blocked at `.Next()`

Here is a sample of our code we rewrote using kinsumer instead of https://github.com/harlow/kinesis-consumer.
It used to work using kinesis-consumer but we could only have one consumer per application.
Anyway, we wrote the following code but we are stuck at k.Next(), the last log before next

	s := session.Must(session.NewSession(&aws.Config{
		Endpoint:   cfg.KinesisEndpoint,
		Region:     cfg.AWSRegion,
	}))
	config := kinsumer.NewConfig().
		WithThrottleDelay(time.Second).
		WithShardCheckFrequency(time.Second)
	k, _ := kinsumer.NewWithSession(s, streamName, fmt.Sprintf("%s_my_app", env), uuid.New().String(), config)
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			log.Info("before Next")
			k.Next()
			log.Info("inside Next")
		}
	}()
	wg.Wait()

Do someone have a any idea what we did wrong?
Thank you

Manually update checkpoint

Hi!
Is it possible to manually update the checkpoint manually?
My use case is that the processing of a record can fail temporarily and I would like to be able to replay the same record again.
Thanks

problem with using module in this package

Hi!

I have a few issues with the current uuid library, and even opened a issue in golang (#34432) and from what I gathered in that issue, the easiest fix is simply changing the uuid library.

I will open a PR changing it to google's UUID lib, as it pretty much works as-is and adding a go.mod file.

This is the error that is happening:

github.com/twitchscience/kinsumer imports
	github.com/twinj/uuid tested by
	github.com/twinj/uuid.test imports
	gopkg.in/stretchr/testify.v1/assert: cannot find module providing package gopkg.in/stretchr/testify.v1/assert

Startup takes 1 minute when DynamoDB shard cache doesn't exist

When starting a Kinsumer for the very first time, it takes quite a bit of waiting to start receiving events because Kinsumer doesn't read the shard IDs from Kinesis. It only checks the cache in DynamoDB, which is naturally empty since it was just created.

The only place where DescribeStream is called to get shard IDs is in performLeaderActions which by default happens every 1 minute. This means that when a Kinsumer is started for the first time, it basically sleeps for 60 seconds.

Kinsumer should have a flag, firstRun, that skips the DynamoDB cache and directly call loadShardIDsFromKinesis if true.

What should user do when Next() returns an error?

We saw some sporadic errors committing "shard error (shardId-000000000015) in checkpointer.commit: error committing checkpoint: RequestError: send request failed
caused by: Post https://dynamodb.us-east-1.amazonaws.com/: read tcp 10.0.0.2:34560->52.94.0.96:443: read: connection reset by peer" and noticed that processing of the shard had stopped. It looks like the shard consumer stops processing when it receives a commit error. In the noop consumer there is a log Fatal which exits the application. What should a production app do? Is it safe to call Run() again and carry on processing with Next()? Is this a bug in the library?

kenesisStreamReady produces LimitExceededException when calling DescribeStream

We are experiencing very frequent LimitExceededExceptions when starting our kinsumer applications. This is a result of the hard 10 TPS rate limit on DescribeStream calls in AWS.

Right now we are avoiding this by creating a kinsumer with the NewWithInterfaces constructor and passing in a custom kinesisiface.KinesisAPI with a workaround implementation of DescribeStream that calls ListShards instead.

It would be great if there was a config toggle to use an alternate method to check kenesis stream readiness so that we can avoid this issue.

Application Ids and Concurrent Apps using Kinsumer

(First of all: thanks for being awesome-y)

Not really an issue, but a question: How relevant are application ids with parallelization?

We'd like to run two kinsumer instances of the same application in order to improve throughput.

Are there any recommendations, best practices or things to watch out for?

Data race during Stop()

This one is very simple to reproduce with noopkinsumer, but to get it to build at all with the latest AWS SDK, a small fix is necessary:

git diff .
diff --git a/cmd/noopkinsumer/main.go b/cmd/noopkinsumer/main.go
index 984c5ba..780e869 100644
--- a/cmd/noopkinsumer/main.go
+++ b/cmd/noopkinsumer/main.go
@@ -57,7 +57,9 @@ func initKinsumer() {
        }
 
        config := kinsumer.NewConfig().WithStats(stats)
-       session := session.Must(session.NewSession(aws.NewConfig()))
+       ac := aws.NewConfig()
+       ac.Region = aws.String("us-east-1")
+       session := session.Must(session.NewSession(ac))
 
        // kinsumer needs a way to differentiate between running clients, generally you want to use information
        // about the machine it is running on like ip. For this example we'll use a uuid

Then simply build the binary with the -race flag:

$ cd cmd/noopkinsumer
$ go build -race .
$ ./noopkinsumer -stream example -createTables

and send a quit signal with CTRL-C.

Results:

./noopkinsumer -stream anlytics-testing-stream -createTables
^C2018/11/21 11:42:12 Total records consumed 123
==================
WARNING: DATA RACE
Write at 0x00c420092a50 by goroutine 48:
  github.com/twitchscience/kinsumer.(*Kinsumer).Run.func1.1()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/kinsumer.go:362 +0x22e
  github.com/twitchscience/kinsumer.(*Kinsumer).Run.func1()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/kinsumer.go:403 +0xa5f

Previous read at 0x00c420092a50 by goroutine 47:
  github.com/twitchscience/kinsumer.(*Kinsumer).becomeLeader.func1()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/leader.go:79 +0x226

Goroutine 48 (running) created at:
  github.com/twitchscience/kinsumer.(*Kinsumer).Run()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/kinsumer.go:350 +0x4c7
  main.runKinsumer()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/cmd/noopkinsumer/main.go:82 +0x4e
  main.main()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/cmd/noopkinsumer/main.go:136 +0x86

Goroutine 47 (running) created at:
  github.com/twitchscience/kinsumer.(*Kinsumer).becomeLeader()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/leader.go:43 +0xef
  github.com/twitchscience/kinsumer.(*Kinsumer).refreshShards()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/kinsumer.go:154 +0x7d4
  github.com/twitchscience/kinsumer.(*Kinsumer).Run()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/kinsumer.go:340 +0x158
  main.runKinsumer()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/cmd/noopkinsumer/main.go:82 +0x4e
  main.main()
      /home/dcelasun/go/src/github.com/twitchscience/kinsumer/cmd/noopkinsumer/main.go:136 +0x86
==================
Found 1 data race(s)

Error using the same application name in two different streams

I created consumers reading from two different streams and I am using the same application-name.

On starting the application I am getting this error intermittently:

InvalidArgumentException: StartingSequenceNumber 49598037874099682602785444260063985364880176723926188034 used in GetShardIterator on shard shardId-000000000000 in stream dummy under account 716421886079 is invalid because it did not come from this stream

StartConsuming gets called from main, which calls consumeStream with same application name but different stream name.

func StartConsuming(conf *config.Config, waitForKinesis *sync.WaitGroup) {

	waitForKinesis.Add(1)
	streamNameA := "logistics-executor"
	go consumeStreamA(streamNameA, conf.Application.Name, waitForKinesis)
	//
	//waitForKinesis.Add(1)
	//streamNameB := "dummy2"
	//go consumeStreamB(streamNameB, conf.Application.Name, waitForKinesis)
}
func consumeStreamB(streamName, applicationName string, waitForKinesis *sync.WaitGroup) {
	recordsB := make(chan []byte)
	k, wg := kinesis.Init(recordsB, streamName, applicationName, false)
	readFromStreamB(recordsB)
	logger.Log.Infof("Stopping K from stream B")
	k.Stop()

	logger.Log.Infof("waiting on wg from stream B")
	wg.Wait()
	logger.Log.Infof("done on wg from stream B")
	waitForKinesis.Done()

}

func readFromStreamB(records chan []byte) {
	logger.Log.Infof("Reading inside ReadFromStream B")
	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, syscall.SIGINT)

	for {
		//time.Sleep(2 * time.Second)
		select {
		case <-sigc:
			logger.Log.Infof("sigc readFromStreamB")
			return

		case record := <-records:
			logger.Log.Infof("ReadFromStream B %v\n", string(record))
			//logger.Log.Infof("perform business logic here B !! ")
		}
	}
}

kinsumer doesn't update with new iterator after 5 mins

I got error that Kinsumer failed to refresh shard iterator after 5 min. It's repro every time I run Kinsumer.
kinsumer/logger.go:16 Got error: Iterator expired. The iterator was created at time Wed Jan 15 05:31:22 UTC 2020 while right now it is Wed Jan 15 05:37:43 UTC 2020 which is further in the future than the tolerated delay of 300000 milliseconds. (%!s()) retry count is 3 / 3
2020-01-14T21:37:43.895-0800 ERROR k.Next returned error {"error": "shard error (shardId-000000000001) in getRecords: ExpiredIteratorException: Iterator expired. The iterator was created at time Wed Jan 15 05:31:22 UTC 2020 while right now it is Wed Jan 15 05:37:43 UTC 2020 which is further in the future than the tolerated delay of 300000 milliseconds.\n\tstatus code: 400, request id: d6e4061f-161f-c6f2-807c-a178d6bd00d3"}

Manual commit checkpoint to ddb table

we want to run multiple instance of a kinesis consumer app in a k8s enviorment.
in order to make sure that we read and work on the data at least once we need to be able to extract the record from the stream, then perform some work, and only then commit the shard position to the ddb table.

i was wondering if this can be done in the kinsumer?

Usage example

Would it be possible to add some example code for using this library to the README or to a subdirectory?

Using kinsumer cross aws accounts delays/ lose the data

I am using kinsumer cross accounts and can see significant delay and sometimes lossing data while reading from Next(). in the same account as kinesis, kinsumer works fine but not when using in different aws account.
As there are multiple consumers (total count: 3), I have increased the throttleDelay to 750ms but that does not help much. My usecase is to intialize the kinsumer, run and stop every 500ms once.
Is this a known issue with kinsumer? any solution?

Please consider a more permissive license

Hi, I've been looking at the options for writing a kinesis worker in go without using the MultiLangDaemon feature of KCL. I've been considering using this library but the license (Amazon Software License) is a bit of a stumbling block.

The commit adding the license doesn't indicate why ASL was chosen. It's really meant for use by software released by Amazon or derived from Amazon's software. Is it possible that it was chosen because of the relation to the aws libraries in use?

Using the kinesis library apis isn't a derivative work under the terms of the ASL by my reading ("...for the purposes of this License, derivative works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work").

Would you consider re-licensing under a more permissive license, perhaps either of the MIT or BSD licenses? This is a better fit as an open source project and means developers aren't constrained by the terms of the Amazon License which adds a bunch of rights in Amazon's favour to the code you wrote. Re-licensing would involve removing the current license files from the repository, adding the appropriate license information (depending on your choice of license) and adding a License section to the README.

Thanks for your consideration!

Issue reading kinesis stream across different aws accounts

Hi,

I am trying to initialize a new Kinsumer Interface to read kinesis stream across different AWS accounts. I want to use different session and config credentials for it but not able to find any method which kinsumer supports currently. Could someone can guide how can I make use of Kinsumer to read streams across aws accounts.
I have a kinesis client object as well. Can I make use of it to initialize kinsumer interface?

De-aggregation of records

Hi @garethlewin,

We've a producer which aggregates and sends records as per KPL and we're consuming records using this library. But we are getting garbage values in the records as mentioned here.
Does this library take care of de-aggregation of records?

non stop ConditionalCheckFailedException error

Hello there, I'm now always getting the following error:

shard error (shardId-000000000005) in checkpointer.release: error releasing checkpoint: ConditionalCheckFailedException: The conditional request failed\n\tstatus code: 400, request id: RR9U1G5VFUCV6AHGT5DFJIPNJVVV4KQNSO5AEMVJF66Q9ASUAAJG

Any idea what's happened and how to fix that?

btw, I noticed #27 ticket

Thank you

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.