twitchscience / kinsumer Goto Github PK
View Code? Open in Web Editor NEWNative Go consumer for AWS Kinesis streams.
License: Other
Native Go consumer for AWS Kinesis streams.
License: Other
Here is a sample of our code we rewrote using kinsumer instead of https://github.com/harlow/kinesis-consumer.
It used to work using kinesis-consumer but we could only have one consumer per application.
Anyway, we wrote the following code but we are stuck at k.Next()
, the last log before next
s := session.Must(session.NewSession(&aws.Config{
Endpoint: cfg.KinesisEndpoint,
Region: cfg.AWSRegion,
}))
config := kinsumer.NewConfig().
WithThrottleDelay(time.Second).
WithShardCheckFrequency(time.Second)
k, _ := kinsumer.NewWithSession(s, streamName, fmt.Sprintf("%s_my_app", env), uuid.New().String(), config)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
log.Info("before Next")
k.Next()
log.Info("inside Next")
}
}()
wg.Wait()
Do someone have a any idea what we did wrong?
Thank you
Hi!
Is it possible to manually update the checkpoint manually?
My use case is that the processing of a record can fail temporarily and I would like to be able to replay the same record again.
Thanks
Hi!
I have a few issues with the current uuid library, and even opened a issue in golang (#34432) and from what I gathered in that issue, the easiest fix is simply changing the uuid library.
I will open a PR changing it to google's UUID lib, as it pretty much works as-is and adding a go.mod file.
This is the error that is happening:
github.com/twitchscience/kinsumer imports
github.com/twinj/uuid tested by
github.com/twinj/uuid.test imports
gopkg.in/stretchr/testify.v1/assert: cannot find module providing package gopkg.in/stretchr/testify.v1/assert
The title says pretty much all. The fix is not obvious though since it is using a blocking channel internally.
When starting a Kinsumer for the very first time, it takes quite a bit of waiting to start receiving events because Kinsumer doesn't read the shard IDs from Kinesis. It only checks the cache in DynamoDB, which is naturally empty since it was just created.
The only place where DescribeStream
is called to get shard IDs is in performLeaderActions
which by default happens every 1 minute. This means that when a Kinsumer is started for the first time, it basically sleeps for 60 seconds.
Kinsumer should have a flag, firstRun
, that skips the DynamoDB cache and directly call loadShardIDsFromKinesis
if true.
We saw some sporadic errors committing "shard error (shardId-000000000015) in checkpointer.commit: error committing checkpoint: RequestError: send request failed
caused by: Post https://dynamodb.us-east-1.amazonaws.com/: read tcp 10.0.0.2:34560->52.94.0.96:443: read: connection reset by peer" and noticed that processing of the shard had stopped. It looks like the shard consumer stops processing when it receives a commit error. In the noop consumer there is a log Fatal which exits the application. What should a production app do? Is it safe to call Run() again and carry on processing with Next()? Is this a bug in the library?
In an effort to limit the number of tables, it would be very useful to use the stream name as the namespace so we can share them.
We are experiencing very frequent LimitExceededExceptions when starting our kinsumer applications. This is a result of the hard 10 TPS rate limit on DescribeStream calls in AWS.
Right now we are avoiding this by creating a kinsumer with the NewWithInterfaces constructor and passing in a custom kinesisiface.KinesisAPI with a workaround implementation of DescribeStream that calls ListShards instead.
It would be great if there was a config toggle to use an alternate method to check kenesis stream readiness so that we can avoid this issue.
I tried to use the sample consumer and passsign the region and all other details under aws config but keep getting below error
Credential should be scoped to correct service: 'kinesis'
(First of all: thanks for being awesome-y)
Not really an issue, but a question: How relevant are application ids with parallelization?
We'd like to run two kinsumer instances of the same application in order to improve throughput.
Are there any recommendations, best practices or things to watch out for?
There is a log.Printf
here:
https://github.com/twitchscience/kinsumer/blob/master/shard_consumer.go#L192
Logging is not standard in golang, but it would be very nice if we could plug our own logger (we use zap).
This one is very simple to reproduce with noopkinsumer, but to get it to build at all with the latest AWS SDK, a small fix is necessary:
git diff .
diff --git a/cmd/noopkinsumer/main.go b/cmd/noopkinsumer/main.go
index 984c5ba..780e869 100644
--- a/cmd/noopkinsumer/main.go
+++ b/cmd/noopkinsumer/main.go
@@ -57,7 +57,9 @@ func initKinsumer() {
}
config := kinsumer.NewConfig().WithStats(stats)
- session := session.Must(session.NewSession(aws.NewConfig()))
+ ac := aws.NewConfig()
+ ac.Region = aws.String("us-east-1")
+ session := session.Must(session.NewSession(ac))
// kinsumer needs a way to differentiate between running clients, generally you want to use information
// about the machine it is running on like ip. For this example we'll use a uuid
Then simply build the binary with the -race
flag:
$ cd cmd/noopkinsumer
$ go build -race .
$ ./noopkinsumer -stream example -createTables
and send a quit signal with CTRL-C.
Results:
./noopkinsumer -stream anlytics-testing-stream -createTables
^C2018/11/21 11:42:12 Total records consumed 123
==================
WARNING: DATA RACE
Write at 0x00c420092a50 by goroutine 48:
github.com/twitchscience/kinsumer.(*Kinsumer).Run.func1.1()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/kinsumer.go:362 +0x22e
github.com/twitchscience/kinsumer.(*Kinsumer).Run.func1()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/kinsumer.go:403 +0xa5f
Previous read at 0x00c420092a50 by goroutine 47:
github.com/twitchscience/kinsumer.(*Kinsumer).becomeLeader.func1()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/leader.go:79 +0x226
Goroutine 48 (running) created at:
github.com/twitchscience/kinsumer.(*Kinsumer).Run()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/kinsumer.go:350 +0x4c7
main.runKinsumer()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/cmd/noopkinsumer/main.go:82 +0x4e
main.main()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/cmd/noopkinsumer/main.go:136 +0x86
Goroutine 47 (running) created at:
github.com/twitchscience/kinsumer.(*Kinsumer).becomeLeader()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/leader.go:43 +0xef
github.com/twitchscience/kinsumer.(*Kinsumer).refreshShards()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/kinsumer.go:154 +0x7d4
github.com/twitchscience/kinsumer.(*Kinsumer).Run()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/kinsumer.go:340 +0x158
main.runKinsumer()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/cmd/noopkinsumer/main.go:82 +0x4e
main.main()
/home/dcelasun/go/src/github.com/twitchscience/kinsumer/cmd/noopkinsumer/main.go:136 +0x86
==================
Found 1 data race(s)
I created consumers reading from two different streams and I am using the same application-name.
On starting the application I am getting this error intermittently:
InvalidArgumentException: StartingSequenceNumber 49598037874099682602785444260063985364880176723926188034 used in GetShardIterator on shard shardId-000000000000 in stream dummy under account 716421886079 is invalid because it did not come from this stream
StartConsuming
gets called from main, which calls consumeStream
with same application name but different stream name.
func StartConsuming(conf *config.Config, waitForKinesis *sync.WaitGroup) {
waitForKinesis.Add(1)
streamNameA := "logistics-executor"
go consumeStreamA(streamNameA, conf.Application.Name, waitForKinesis)
//
//waitForKinesis.Add(1)
//streamNameB := "dummy2"
//go consumeStreamB(streamNameB, conf.Application.Name, waitForKinesis)
}
func consumeStreamB(streamName, applicationName string, waitForKinesis *sync.WaitGroup) {
recordsB := make(chan []byte)
k, wg := kinesis.Init(recordsB, streamName, applicationName, false)
readFromStreamB(recordsB)
logger.Log.Infof("Stopping K from stream B")
k.Stop()
logger.Log.Infof("waiting on wg from stream B")
wg.Wait()
logger.Log.Infof("done on wg from stream B")
waitForKinesis.Done()
}
func readFromStreamB(records chan []byte) {
logger.Log.Infof("Reading inside ReadFromStream B")
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT)
for {
//time.Sleep(2 * time.Second)
select {
case <-sigc:
logger.Log.Infof("sigc readFromStreamB")
return
case record := <-records:
logger.Log.Infof("ReadFromStream B %v\n", string(record))
//logger.Log.Infof("perform business logic here B !! ")
}
}
}
I got error that Kinsumer failed to refresh shard iterator after 5 min. It's repro every time I run Kinsumer.
kinsumer/logger.go:16 Got error: Iterator expired. The iterator was created at time Wed Jan 15 05:31:22 UTC 2020 while right now it is Wed Jan 15 05:37:43 UTC 2020 which is further in the future than the tolerated delay of 300000 milliseconds. (%!s()) retry count is 3 / 3
2020-01-14T21:37:43.895-0800 ERROR k.Next returned error {"error": "shard error (shardId-000000000001) in getRecords: ExpiredIteratorException: Iterator expired. The iterator was created at time Wed Jan 15 05:31:22 UTC 2020 while right now it is Wed Jan 15 05:37:43 UTC 2020 which is further in the future than the tolerated delay of 300000 milliseconds.\n\tstatus code: 400, request id: d6e4061f-161f-c6f2-807c-a178d6bd00d3"}
we want to run multiple instance of a kinesis consumer app in a k8s enviorment.
in order to make sure that we read and work on the data at least once we need to be able to extract the record from the stream, then perform some work, and only then commit the shard position to the ddb table.
i was wondering if this can be done in the kinsumer?
Would it be possible to add some example code for using this library to the README or to a subdirectory?
Hi. I'd like to define initial position in stream, but I don't see how to define such parameter in kinsumer.NewConfig()
Thank you
Thoughts on supporting enhanced fan-out? https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html
I am using kinsumer cross accounts and can see significant delay and sometimes lossing data while reading from Next(). in the same account as kinesis, kinsumer works fine but not when using in different aws account.
As there are multiple consumers (total count: 3), I have increased the throttleDelay to 750ms but that does not help much. My usecase is to intialize the kinsumer, run and stop every 500ms once.
Is this a known issue with kinsumer? any solution?
Hi, I've been looking at the options for writing a kinesis worker in go without using the MultiLangDaemon feature of KCL. I've been considering using this library but the license (Amazon Software License) is a bit of a stumbling block.
The commit adding the license doesn't indicate why ASL was chosen. It's really meant for use by software released by Amazon or derived from Amazon's software. Is it possible that it was chosen because of the relation to the aws libraries in use?
Using the kinesis library apis isn't a derivative work under the terms of the ASL by my reading ("...for the purposes of this License, derivative works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work").
Would you consider re-licensing under a more permissive license, perhaps either of the MIT or BSD licenses? This is a better fit as an open source project and means developers aren't constrained by the terms of the Amazon License which adds a bunch of rights in Amazon's favour to the code you wrote. Re-licensing would involve removing the current license files from the repository, adding the appropriate license information (depending on your choice of license) and adding a License section to the README.
Thanks for your consideration!
Badge in README shows build status unknown and links to https://travis-ci.org/TwitchScience/kinsumer which reports "We couldn't find the repository
TwitchScience/kinsumer".
f4c965d is the last commit to have a travis build status flag (which links to https://travis-ci.org/twitchscience/kinsumer/builds/415983838?utm_source=github_status&utm_medium=notification).
Hi,
I am trying to initialize a new Kinsumer Interface to read kinesis stream across different AWS accounts. I want to use different session and config credentials for it but not able to find any method which kinsumer supports currently. Could someone can guide how can I make use of Kinsumer to read streams across aws accounts.
I have a kinesis client object as well. Can I make use of it to initialize kinsumer interface?
hi. I want to produce batched batch records from my producers using https://github.com/tj/go-kinesis or https://github.com/a8m/kinesis-producer but kinsumer doesn't seem to see any record produced in batch.
Is it possible to work in such way? If yes, is there some example?
Thank you
Hi @garethlewin,
We've a producer which aggregates and sends records as per KPL and we're consuming records using this library. But we are getting garbage values in the records as mentioned here.
Does this library take care of de-aggregation of records?
Hello there, I'm now always getting the following error:
shard error (shardId-000000000005) in checkpointer.release: error releasing checkpoint: ConditionalCheckFailedException: The conditional request failed\n\tstatus code: 400, request id: RR9U1G5VFUCV6AHGT5DFJIPNJVVV4KQNSO5AEMVJF66Q9ASUAAJG
Any idea what's happened and how to fix that?
btw, I noticed #27 ticket
Thank you
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.