Coder Social home page Coder Social logo

amazon-kinesis-client-python's Introduction

Amazon Kinesis Client Library for Python

Version UnitTestCoverage

This package provides an interface to the Amazon Kinesis Client Library (KCL) MultiLangDaemon, which is part of the Amazon KCL for Java. Developers can use the Amazon KCL to build distributed applications that process streaming data reliably at scale. The Amazon KCL takes care of many of the complex tasks associated with distributed computing, such as load-balancing across multiple instances, responding to instance failures, checkpointing processed records, and reacting to changes in stream volume. This interface manages the interaction with the MultiLangDaemon so that developers can focus on implementing their record processor executable. A record processor executable typically looks something like:

    #!env python
    from amazon_kclpy import kcl
    import json, base64

    class RecordProcessor(kcl.RecordProcessorBase):

        def initialize(self, initialiation_input):
            pass

        def process_records(self, process_records_input):
            pass

        def lease_lost(self, lease_lost_input):
            pass

        def shard_ended(self, shard_ended_input):
            pass

        def shutdown_requested(self, shutdown_requested_input):
            pass

    if __name__ == "__main__":
        kclprocess = kcl.KCLProcess(RecordProcessor())
        kclprocess.run()

Before You Get Started

Before running the samples, you'll want to make sure that your environment is configured to allow the samples to use your AWS Security Credentials.

By default the samples use the DefaultCredentialsProvider so you'll want to make your credentials available to one of the credentials providers in that provider chain. There are several ways to do this such as providing a ~/.aws/credentials file, or if you're running on EC2, you can associate an IAM role with your instance with appropriate access.

For questions regarding Amazon Kinesis Service and the client libraries please visit the Amazon Kinesis Forums

Running the Sample

Using the amazon_kclpy package requires the MultiLangDaemon which is provided by the Amazon KCL for Java. These jars will be downloaded automatically by the install command, but you can explicitly download them with the download_jars command. From the root of this repo, run:

python setup.py download_jars
python setup.py install

Now the amazon_kclpy and boto (used by the sample putter script) and required jars should be installed in your environment. To start the sample putter, run:

sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster

This will create an Amazon Kinesis stream called words and put the words specified by the -w options into the stream once each. Use -p SECONDS to indicate a period over which to repeatedly put these words.

Now we would like to run an Amazon KCL for Python application that reads records from the stream we just created, but first take a look in the samples directory, you'll find a file called sample.properties, cat that file:

cat samples/sample.properties

You'll see several properties defined there. executableName indicates the executable for the MultiLangDaemon to run, streamName is the Kinesis stream to read from, appName is the Amazon KCL application name to use which will be the name of an Amazon DynamoDB table that gets created by the Amazon KCL, initialPositionInStream tells the Amazon KCL how to start reading from shards upon a fresh startup. To run the sample application you can use a helper script included in this package. Note you must provide a path to java (version 1.7 or greater) to run the Amazon KCL.

amazon_kclpy_helper.py --print_command \
    --java <path-to-java> --properties samples/sample.properties

This will print the command needed to run the sample which you can copy paste, or surround the command with back ticks to run it.

`amazon_kclpy_helper.py --print_command \
    --java <path-to-java> --properties samples/sample.properties`

Alternatively, if you don't have the source on hand, but want to run the sample app you can use the --sample argument to indicate you'd like to get the sample.properties file from the installation location.

amazon_kclpy_helper.py --print_command --java <path-to-java> --sample

Running on EC2

Running on EC2 is simple. Assuming you are already logged into an EC2 instance running Amazon Linux, the following steps will prepare your environment for running the sample app. Note the version of java that ships with Amazon Linux can be found at /usr/bin/java and should be 1.7 or greater.

sudo yum install python-pip

sudo pip install virtualenv

virtualenv /tmp/kclpy-sample-env

source /tmp/kclpy-sample-env/bin/activate

pip install amazon_kclpy

Under the Hood - What You Should Know about Amazon KCL's MultiLangDaemon

Amazon KCL for Python uses Amazon KCL for Java internally. We have implemented a Java-based daemon, called the MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn the user-defined record processor script/program as a sub-process. The MultiLangDaemon communicates with this sub-process over standard input/output using a simple protocol, and therefore the record processor script/program can be written in any language.

At runtime, there will always be a one-to-one correspondence between a record processor, a child process, and an Amazon Kinesis Shard. The MultiLangDaemon will make sure of that, without any need for the developer to intervene.

In this release, we have abstracted these implementation details away and exposed an interface that enables you to focus on writing record processing logic in Python. This approach enables Amazon KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.

See Also

Release Notes

Release 2.1.4 (April 23, 2024)

  • Upgraded KCL and KCL-Multilang dependencies from 2.5.2 to 2.5.8 PR #239
  • Upgraded ion-java from 1.5.1 to 1.11.4 PR #243
  • Upgraded logback version from 1.3.0 to 1.3.12 PR #242
  • Upgraded io.netty dependency from 4.1.86.Final to 4.1.94.Final PR #234
  • Upgraded Google Guava dependency from 32.0.0-jre to 32.1.1-jre PR #234
  • Upgraded jackson-databind from 2.13.4 to 2.13.5 PR #234
  • Upgraded protobuf-java from 3.21.5 to 3.21.7 PR #234

Release 2.1.3 (August 8, 2023)

  • Added the ability to specify STS endpoint and region PR #221
  • Upgraded KCL and KCL-Multilang Dependencies from 2.5.1 to 2.5.2 PR #221

Release 2.1.2 (June 29, 2023)

  • Added the ability to pass in streamArn to multilang Daemon PR #221
  • Upgraded KCL and KCL-Multilang Dependencies from 2.4.4 to 2.5.1 PR #221
  • Upgraded Google Guava dependency from 31.0.1-jre to 32.0.0-jre PR #223
  • Added aws-java-sdk-sts dependency PR #212

Release 2.1.1 (January 17, 2023)

  • Include the pom file in MANIFEST

Release 2.1.0 (January 12, 2023)

Release 2.0.6 (November 23, 2021)

  • Upgraded multiple dependencies PR #152
    • Amazon Kinesis Client Library 2.3.9
    • ch.qos.logback 1.2.7

Release 2.0.5 (November 11, 2021)

  • Upgraded multiple dependencies PR #148
    • Amazon Kinesis Client Library 2.3.8
    • AWS SDK 2.17.52
  • Added dependencies
    • AWS SDK json-utils 2.17.52
    • third-party-jackson-core 2.17.52
    • third-party-jackson-dataformat-cbor 2.17.52
  • Updated samples/sample.properties reflecting support for InitialPositionInStreamExtended
    • Related: #804 Allowing user to specify an initial timestamp in which daemon will process records.
    • Feature released with previous release 2.0.4

Release 2.0.4 (October 26, 2021)

  • Revert/downgrade multiple dependencies as KCL 2.3.7 contains breaking change PR #145
    • Amazon Kinesis Client Library 2.3.6
    • AWS SDK 2.16.98
  • Upgraded dependencies
    • jackson-dataformat-cbor 2.12.4
    • AWS SDK 1.12.3

⚠️ [BREAKING CHANGES] Release 2.0.3 (October 21, 2021)

  • Upgraded multiple dependencies in PR #142
    • Amazon Kinesis Client Library 2.3.7
    • AWS SDK 2.17.52
    • AWS Java SDK 1.12.1
    • AWS Glue 1.1.5
    • Jackson 2.12.4
    • io.netty 4.1.68.Final
    • guava 31.0.1-jre

Release 2.0.2 (June 4, 2021)

  • Upgraded multiple dependencies in PR #137
    • Amazon Kinesis Client Library 2.3.4
    • AWS SDK 2.16.75
    • AWS Java SDK 1.11.1031
    • Amazon ion java 1.5.1
    • Jackson 2.12.3
    • io.netty 4.1.65.Final
    • typeface netty 2.0.5
    • reactivestreams 1.0.3
    • guava 30.1.1-jre
    • Error prone annotations 2.7.1
    • j2objc annotations 2.7.1
    • Animal sniffer annotations 1.20
    • slf4j 1.7.30
    • protobuf 3.17.1
    • Joda time 2.10.10
    • Apache httpclient 4.5.13
    • Apache httpcore 4.4.14
    • commons lang3 3.12.0
    • commons logging 1.2
    • commons beanutils 1.9.4
    • commons codec 1.15
    • commons collections4 4.4
    • commons io 2.9.0
    • jcommander 1.81
    • rxjava 2.2.21
  • Added Amazon Glue schema registry 1.0.2

Release 2.0.1 (February 27, 2019)

  • Updated to version 2.1.2 of the Amazon Kinesis Client Library for Java.
    This update also includes version 2.4.0 of the AWS Java SDK.

Release 2.0.0 (January 15, 2019)

  • Introducing support for Enhanced Fan-Out
  • Updated to version 2.1.0 of the Amazon Kinesis Client for Java
    • Version 2.1.0 now defaults to using RegisterStreamConsumer Kinesis API, which provides dedicated throughput compared to GetRecords.
    • Version 2.1.0 now defaults to using SubscribeToShard Kinesis API, which provides lower latencies than GetRecords.
    • WARNING: RegisterStreamConsumer and SubscribeToShard are new APIs, and may require updating any explicit IAM policies
    • For more information about Enhaced Fan-Out and Polling with the KCL check out the announcement and developer documentation.
  • Introducing version 3 of the RecordProcessorBase which supports the new ShardRecordProcessor interface
    • The shutdown method from version 2 has been removed and replaced by leaseLost and shardEnded methods.
    • Introducing leaseLost method, which takes LeaseLostInput object and is invoked when a lease is lost.
    • Introducing shardEnded method, which takes ShardEndedInput object and is invoked when all records from a split/merge have been processed.
  • Updated AWS SDK version to 2.2.0
  • MultiLangDaemon now uses logging using logback
    • MultiLangDaemon supports custom logback.xml file via the --log-configuration option.
    • amazon_kclpy_helper script supports --log-configuration option for command generation.

Release 1.5.1 (January 2, 2019)

  • Updated to version 1.9.3 of the Amazon Kinesis Client Library for Java.
  • Changed to now download jars from Maven using https.
  • Changed to raise exception when downloading from Maven fails.

Release 1.5.0 (February 7, 2018)

  • Updated to version 1.9.0 of the Amazon Kinesis Client Library for Java
    • Version 1.9.0 now uses the ListShards Kinesis API, which provides a higher call rate than DescribeStream.
    • WARNING: ListShards is a new API, and may require updating any explicit IAM policies
    • PR #71

Release 1.4.5 (June 28, 2017)

  • Record processors can now be notified, and given a final opportunity to checkpoint, when the KCL is being shutdown.

    To use this feature the record processor must implement the shutdown_requested operation from the respective processor module. See v2/processor.py or kcl.py for the required API.

Release 1.4.4 (April 7, 2017)

  • PR #47: Update to release 1.7.5 of the Amazon Kinesis Client.
    • Additionally updated to version 1.11.115 of the AWS Java SDK.
    • Fixes Issue #43.
    • Fixes Issue #27.

Release 1.4.3 (January 3, 2017)

  • PR #39: Make record objects subscriptable for backwards compatibility.

Release 1.4.2 (November 21, 2016)

  • PR #35: Downloading JAR files now runs correctly.

Release 1.4.1 (November 18, 2016)

  • Installation of the library into a virtual environment on macOS, and Windows now correctly downloads the jar files.

Release 1.4.0 (November 9, 2016)

  • Added a new v2 record processor class that allows access to updated features.
    • Record processor initialization
      • The initialize method receives an InitializeInput object that provides shard id, and the starting sequence and sub sequence numbers.
    • Process records calls
      • The process_records calls now receives a ProcessRecordsInput object that, in addition to the records, now includes the millisBehindLatest for the batch of records
      • Records are now represented as a Record object that adds new data, and includes some convenience methods
        • Adds a binary_data method that handles the base 64 decode of the data.
        • Includes the sub sequence number of the record.
        • Includes the approximate arrival time stamp of the record.
    • Record processor shutdown
      • The method shutdown now receives a ShutdownInput object.
  • Checkpoint methods now accept a sub sequence number in addition to the sequence number.

Release 1.3.1

  • Version number increase to stay inline with PyPI.

Release 1.3.0

  • Updated dependency to Amazon KCL version 1.6.4

Release 1.2.0

  • Updated dependency to Amazon KCL version 1.6.1

Release 1.1.0 (January 27, 2015)

  • Python 3 support All Python files are compatible with Python 3

Release 1.0.0 (October 21, 2014)

  • amazon_kclpy module exposes an interface to allow implementation of record processor executables that are compatible with the MultiLangDaemon
  • samples module provides a sample putter application using boto and a sample processing app using amazon_kclpy

License

This library is licensed under the Apache 2.0 License.

amazon-kinesis-client-python's People

Contributors

avahuang0429 avatar avinashchowdary avatar dependabot[bot] avatar hyandell avatar j0nnyr0berts avatar jcarey03 avatar joshua-kim avatar jpeddicord avatar knorwood avatar lucienlu-aws avatar mikramulhaq avatar pelaezryan avatar pfifer avatar rexcsn avatar sahilpalvia avatar spg avatar stair-aws avatar thesnicketylemon avatar timgates42 avatar timmartin19 avatar vicki-c avatar vincentvilo-aws avatar zengyu714 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amazon-kinesis-client-python's Issues

Follow up question: how to use initialize_input parameter

@pfifer Thanks for your reply in this issue #81 . The document of AWS kinesis said that 'The KCL calls the initialize method when the record processor is instantiated, passing a specific shard ID as a parameter. This record processor processes only this shard' (https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-implementation-app-py.html). So if I develop my application in python, how can I scale it when there are large volume of data ?

Setup periodically fails to completely download jar

When pip installing the python client, the installation will complete successfully, but we end up with an empty amazon-kinesis-client-1.9.0.jar.

$ ls -la  /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars/amazon-kinesis-client-1.9.0.jar

-rw-r--r-- 1 root staff 0 Apr 11 12:31 /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars/amazon-kinesis-client-1.9.0.jar

Note: This happens maybe 1/30 installations.

$ grep -C 5 amazon-kinesis-client /var/log/cloud-init-output.log


  Running setup.py install for amazon-kclpy: started
    Running command /usr/bin/python -u -c "import setuptools, tokenize;__file__='/tmp/pip-build-64l3ox/amazon-kclpy/setup.py';f=getattr(tokenize, 'open', open)(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))" install --record /tmp/pip-cpcXRc-record/install-record.txt --single-version-externally-managed --compile
    running install
    Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/amazon-kinesis-client/1.9.0/amazon-kinesis-client-1.9.0.jar
    Saving http://search.maven.org/remotecontent?filepath=com/amazonaws/amazon-kinesis-client/1.9.0/amazon-kinesis-client-1.9.0.jar -> amazon_kclpy/jars/amazon-kinesis-client-1.9.0.jar
    Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-dynamodb/1.11.272/aws-java-sdk-dynamodb-1.11.272.jar
    Saving http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-dynamodb/1.11.272/aws-java-sdk-dynamodb-1.11.272.jar -> amazon_kclpy/jars/aws-java-sdk-dynamodb-1.11.272.jar
    Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-s3/1.11.272/aws-java-sdk-s3-1.11.272.jar
    Saving http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-s3/1.11.272/aws-java-sdk-s3-1.11.272.jar -> amazon_kclpy/jars/aws-java-sdk-s3-1.11.272.jar
    Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-kms/1.11.272/aws-java-sdk-kms-1.11.272.jar
--
    copying amazon_kclpy/jars/ion-java-1.0.2.jar -> build/lib.linux-x86_64-2.7/amazon_kclpy/jars
    copying amazon_kclpy/jars/httpclient-4.5.2.jar -> build/lib.linux-x86_64-2.7/amazon_kclpy/jars
    copying amazon_kclpy/jars/jackson-dataformat-cbor-2.6.7.jar -> build/lib.linux-x86_64-2.7/amazon_kclpy/jars
    copying amazon_kclpy/jars/commons-lang-2.6.jar -> build/lib.linux-x86_64-2.7/amazon_kclpy/jars
    copying amazon_kclpy/jars/aws-java-sdk-cloudwatch-1.11.272.jar -> build/lib.linux-x86_64-2.7/amazon_kclpy/jars
    copying amazon_kclpy/jars/amazon-kinesis-client-1.9.0.jar -> build/lib.linux-x86_64-2.7/amazon_kclpy/jars
    copying amazon_kclpy/jars/aws-java-sdk-kinesis-1.11.272.jar -> build/lib.linux-x86_64-2.7/amazon_kclpy/jars
    copying amazon_kclpy/jars/protobuf-java-2.6.1.jar -> build/lib.linux-x86_64-2.7/amazon_kclpy/jars
    copying amazon_kclpy/jars/aws-java-sdk-core-1.11.272.jar -> build/lib.linux-x86_64-2.7/amazon_kclpy/jars
    copying samples/sample.properties -> build/lib.linux-x86_64-2.7/samples
    running build_scripts
--
    copying build/lib.linux-x86_64-2.7/amazon_kclpy/jars/ion-java-1.0.2.jar -> /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars
    copying build/lib.linux-x86_64-2.7/amazon_kclpy/jars/httpclient-4.5.2.jar -> /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars
    copying build/lib.linux-x86_64-2.7/amazon_kclpy/jars/jackson-dataformat-cbor-2.6.7.jar -> /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars
    copying build/lib.linux-x86_64-2.7/amazon_kclpy/jars/commons-lang-2.6.jar -> /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars
    copying build/lib.linux-x86_64-2.7/amazon_kclpy/jars/aws-java-sdk-cloudwatch-1.11.272.jar -> /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars
    copying build/lib.linux-x86_64-2.7/amazon_kclpy/jars/amazon-kinesis-client-1.9.0.jar -> /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars
    copying build/lib.linux-x86_64-2.7/amazon_kclpy/jars/aws-java-sdk-kinesis-1.11.272.jar -> /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars
    copying build/lib.linux-x86_64-2.7/amazon_kclpy/jars/protobuf-java-2.6.1.jar -> /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars
    copying build/lib.linux-x86_64-2.7/amazon_kclpy/jars/aws-java-sdk-core-1.11.272.jar -> /usr/local/lib/python2.7/dist-packages/amazon_kclpy/jars
    creating /usr/local/lib/python2.7/dist-packages/samples

Nothing in the verbose output is indicating that the download or copy is failing, so I'm really not sure what to suggest. But it would be nice if there was some kind of validation step and the installation would fail if the validation failed.

Cannot Override DynamoDB Endpoint

I am using kinesalite to run test automation. I was able to easily set the kinesalite endpoint for the kinesis stream producer (a modification of sample_kinesis_wordputter.py) by using **boto.kinesis.layer1.KinesisConnection(kwargs)

When performing Kinesis consumer operations against kinesalite (using KCLpy), the KCL checkpointer writes lease tables to AWS's DynamoDB. I want it to do this against a local dynalite server instead. This would require that KCLpy allow to specify an endpoint for DynamoDB.

I haven't found any documentation about anything analagous to KinesisConnection. Is there?

The plural of 'datum' is 'data'

Please remove reference to 'datums'-- these are actually items used in geological surveys (per google: Datums are the basis for all geodetic survey work. Near coastal areas, mean sea level (and other tidal datums) is determined by analyzing observations from a tide gauge. This image shows a tide gauge at the St. Charles Parish Water Level Monitoring System in Louisiana.)

I'm getting logging information that looks like:

INFO: Successfully published 18 datums.

and that's just wrong, since I'm assuming the library isn't logging tide gauges.

allow properties to be overriden from command line

I want to run multiple KCL consumers in docker. In order to have multple KCL consumer client, I need each of them using different application name with same stream. However, I am unable to set application name from command line, and that means I will need to have multiple properties file, one for each consumer to have different application name.

This is a problem for me because I will then have to know how many consumer I need to run before hand, or I will need to rebuild docker image to have more properties file with different application name. Obviously there are ways to get around these problem, but they are a pain in the ass. The best way is to allow overriding of properties attribute from command line. Thanks.

Version 2.0 Release downloads incorrect JAR files

Related to issue #84 (which appears have lost traction) it seems the most recent release of amazon_kclpy 2.0 broke my project because it is downloading outdated jar files. It results in the error Could not find com.amazonaws.services.kinesis.multilang.MultiLangDaemon

The issue can be manually fixed by removing all the jar files that are downloaded by default when the package is installed, and then using the python setup.py download_jars in this repo's README to download the correct ones. However, this is not a good solution. In the meantime I have hardcoded my project to use version 1.5 instead.

review return type in messages.Record.binary_data

From running the sample word app on python 3.6.5, the data that comes into the process_record method has the type 'bytes'. That value comes from the binary_data inside the Record class. The return type for that property is str but it should probably be updated to bytes.

Timer in a record processor

I've been trying get threaded timer to work inside a record processor, as we are buffering client side to optimize batch inserts. In the init of our processor we are simply doing something like Timer(10, self.flush) and in flush creating a new timer. It appears as though the timer fires 2-3 times, sometimes the print statement for 2 of the flush calls appear right beside each other in stdout. Other times we get 3 and then they stop firing all together... Any ideas?

Example of the init and flush of our record processor:

  def __init__(self):
        self.rp = Timer(10, self.flush)
        self.rp.start()
        self.SLEEP_SECONDS = 5
        self.CHECKPOINT_RETRIES = 5
        self.CHECKPOINT_FREQ_SECONDS = 3600
        self.logger = PILogger("cloudsearch_document_processor")
        self.writer = CloudSearchInterface(CLOUDSEARCH_DOCUMENT_DOMAIN)

    def flush(self):
        print "HIT TIMER! {0}".format(self)
        self.rp = Timer(10, self.flush)
        self.rp.start()

Received ValueError: year is out of range from kclpy consistently on process start

When starting up the process we receive an that error says:

self._approximate_arrival_timestamp = datetime.fromtimestamp(self._timestamp_millis / 1000.0)
Received error line from subprocess [ValueError: year is out of range] for shard shardId-000000000000
ValueError: year is out of range

From our experimentation it seems that self._timestamp_millis is in the 1e15 range (as in a microsecond value is being sent through and not a millisecond value).

We put a hacky solution to in our environment to workaround the problem temporarily:

if self._timestamp_millis >= 1e13:
    self._timestamp_millis = self._timestamp_millis / 1000.0
self._approximate_arrival_timestamp = datetime.fromtimestamp(self._timestamp_millis / 1000.0)

This fixed the issue from occurring but obviously it's not a permanent solution. We downgraded to the version of this library we were previously using and that worked fine as well.

The problem is presenting on amazon_kclpy==2.0.0 and we are using OpenJDK1.8

openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-1~bpo8+1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)

How to change the log level

I am using kcl-python for developing my consumer, I am running my application using the commands - amazon_kclpy_helper.py --print_command --java <path-to-java> --properties samples/sample.properties
I am however not able to change the log level, that is being dumped on the console. Can you please guide me? Help will be so much appreciated.
Thanks

Cannot run Kinesis apps due to "Reached end of STDIN of child process for shard" 2

I am trying to run the vanilla python example sample_kclpy_app.py and get the error listed below.

This seems to be similar to the issue #5 but I haven't made any changes to the code provided in this repository.

I did change the sample.properties file line: executableName = python sample_kclpy_app.py as suggested by this stack overflow answer: http://stackoverflow.com/questions/30043197/how-to-verify-that-amazon-kinesis-python-client-is-working

Without the last change, I don't get the error, but also my script does not seem to be executed (I don't get any log messages from my code and as I understand the comments in #5 I should get INFO log events, right?

May 20, 2015 2:06:17 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol waitForStatusMessage
SEVERE: Failed to get status message for initialize action for shard shardId-000000000000
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:141)
at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:116)
at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.initialize(MultiLangProtocol.java:70)
at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.initialize(MultiLangRecordProcessor.java:131)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:74)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message.
at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:84)
at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:31)
at com.amazonaws.services.kinesis.multilang.LineReaderTask.call(LineReaderTask.java:70)
... 4 more

May 20, 2015 2:06:17 AM com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor stopProcessing
SEVERE: Encountered an error while trying to initialize record processor
java.lang.RuntimeException: Failed to initialize child process
at     com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.initialize(MultiLangRecordProcessor.java:132)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:74)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

May 20, 2015 2:06:17 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Draining STDOUT for shardId-000000000000
May 20, 2015 2:06:17 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Stopping: Draining STDOUT for shardId-000000000000
May 20, 2015 2:06:17 AM com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor childProcessShutdownSequence
INFO: Child process exited with value: 2

Error: Could not find or load main class com.amazonaws.services.kinesis.multilang.MultiLangDaemon

I was using the amazon_kclpy_helper.py command to print the command to run Python KCL client so far and it stopped working approximately yesterday.

When I now create a new virtualenv and reinstall the amazon_kclpy it stopped working in all my environments (other working machines, build machines, etc). All the jars were correctly downloaded into the virtualenv when installing the library.

Also all the samples in that the documentation/README fail with the same error.

Examples from my vagrant development VM:

amazon_kclpy_helper.py --print_command  --java /usr/bin/java --properties my_properties.properties

/usr/bin/java -cp /home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/httpcore-4.4.4.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/commons-logging-1.1.3.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/aws-java-sdk-cloudwatch-1.11.272.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/aws-java-sdk-kms-1.11.272.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/aws-java-sdk-kinesis-1.11.272.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/amazon-kinesis-client-1.9.0.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/ion-java-1.0.2.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/httpclient-4.5.2.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/aws-java-sdk-s3-1.11.272.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/jackson-databind-2.6.7.1.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/commons-lang-2.6.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/jmespath-java-1.11.272.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/commons-codec-1.9.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/aws-java-sdk-core-1.11.272.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/joda-time-2.8.1.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/sit

e-packages/amazon_kclpy/jars/jackson-annotations-2.6.0.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/jackson-core-2.6.7.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/guava-18.0.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/protobuf-java-2.6.1.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/jackson-dataformat-cbor-2.6.7.jar:/home/vagrant/.local/share/virtualenvs/data-service-8jYGrd1m/lib/python3.7/site-packages/amazon_kclpy/jars/aws-java-sdk-dynamodb-1.11.272.jar:/vagrant/projects/data-service/consumer/config com.amazonaws.services.kinesis.multilang.MultiLangDaemon development.properties

Error: Could not find or load main class com.amazonaws.services.kinesis.multilang.MultiLangDaemon

Did something change in the dependent libraries that would affect the command to fail running the KCL?

Thank you for looking into it.

Records not being processed with sample app - simple print(data) not working

I've gone through several issues and SO questions and haven't been able to get the sample app to work.

Here's how to reproduce this;

# Run dynamodb
docker run -d -p 8000:8000 dwmkerr/dynamodb

# Create java8 container
docker run -it jkosgei/alpine-java8 ash # Source at https://github.com/jonathan-kosgei/alpine-java8

export JAVA_HOME=/usr/lib/jvm/java-1.8-openjdk/jre
export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=

apk add --no-cache python3 curl git
git clone https://github.com/awslabs/amazon-kinesis-client-python.git
cd amazon-kinesis-client-python

# edit sample.properties with sample below

# Start sample producer
sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster

# Do setup
python3 setup.py download_jars &&\
python3 setup.py install

# Run sample app
`amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties prod.properties`

The following are the only things I've changed in my prod.properties

executableName = python3 /amazon-kinesis-client-python/samples/sample_kclpy_app.py
dynamoDBEndpoint = http://127.0.0.1:8000
applicationName = test
maxRecords = 10000
idleTimeBetweenReadsInMillis = 200
callProcessRecordsEvenForEmptyRecordList = true

In the sample_kclpy_app.py my process_record function looks like this;

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        print('Data: ', data)
        with open('/log.txt', 'a') as the_file:
        	the_file.write(data)
        return

My logs

/amazon-kinesis-client-python # `amazon_kclpy_helper.py --print_command --java /
usr/bin/java --properties prod.properties`
Mar 07, 2018 11:36:26 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator getConfiguration
INFO: Value of workerId is not provided in the properties. WorkerId is automatically assigned as: 13bdbb57-e701-4be1-b2ca-6b808fa95b73
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property regionName with value us-east-1
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property idleTimeBetweenReadsInMillis with value 200
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property maxRecords with value 10000
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property callProcessRecordsEvenForEmptyRecordList with value true
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property dynamoDBEndpoint with value http://127.0.0.1:8000
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property initialPositionInStream with value TRIM_HORIZON
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig buildExecutorService
INFO: Using a cached thread pool.
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig <init>
INFO: Running test to process stream words with executable python3 
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using workerId: bleh
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using credentials with access key id: bleh
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: MultiLangDaemon is adding the following fields to the User Agent: amazon-kinesis-client-library-java-1.9.0 amazon-kinesis-multi-lang-daemon/1.0.1 python/3.6 python3
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator <init>
INFO: With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initialization attempt 1
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initializing LeaseCoordinator
Mar 07, 2018 11:36:37 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator initialize
INFO: Created new lease table for coordinator with initial read capacity of 10 and write capacity of 10.
Mar 07, 2018 11:36:41 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Syncing Kinesis shard info
Mar 07, 2018 11:36:43 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Starting LeaseCoordinator
Mar 07, 2018 11:36:46 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
INFO: Worker 13bdbb57-e701-4be1-b2ca-6b808fa95b73 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
Mar 07, 2018 11:36:48 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker takeLeases
INFO: Worker 13bdbb57-e701-4be1-b2ca-6b808fa95b73 successfully took 1 leases: shardId-000000000000
Mar 07, 2018 11:36:56 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker run
INFO: Initialization complete. Starting worker loop.
Mar 07, 2018 11:36:58 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker infoForce
INFO: Created new shardConsumer for : ShardInfo [shardId=shardId-000000000000, concurrencyToken=bleh, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}]
Mar 07, 2018 11:36:58 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No need to block on parents [] of shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
INFO: Initializing shard shardId-000000000000 with TRIM_HORIZON
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing InitializeMessage to child process for shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading STDERR for shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 110 bytes for shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Mar 07, 2018 11:37:29 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:37:29 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:38:39 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:38:39 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:39:42 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:39:42 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:40:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:40:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:41:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:41:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...

Pip install from cache does not include KCL jars

We noticed that after the recent changes (fix of #28) the amazon_kclpy package has a bug regarding downloads of JAR files.

Steps to reproduce:

$ cd /tmp
$ virtualenv .venv
$ . .venv/bin/activate
$ pip install --no-cache amazon_kclpy
$ ls .venv/lib/python*/site-packages/amazon_kclpy/jars
__init__.py amazon-kinesis-client-1.7.2.jar aws-java-sdk-cloudwatch-1.11.14.jar ...
$ pip uninstall -y amazon_kclpy
$ pip install amazon_kclpy
$ ls .venv/lib/python*/site-packages/amazon_kclpy/jars
__init__.py

After the last step, the JAR files are missing in the jars directory. Would be great to get this fixed because most people will probably not use the --no-cache option on pip install, then the library becomes unusable.

multilang daemon logs

I need to log the multilang daemon logs (The daemon logs, not my application logs) to a file.
Preferably Also to set up the logging level.
Java is not really my thing, I I can't seem to find a way to do it from the docs.
How can I set it ?

Unable to use EC2ContainerCredentialsProviderWrapper in kinesis consumer

Inside ECS container, DefaultAWSCredentialsProviderChain is getting ECS host profile instead of the taskRoleArn provided in ECS task definition.

We are using amazon_kclpy client to consumer from a stream. Even if we use latest version of this (1.4.3), we have the issue. When I explicitly set AWSCredentialsProvider = EC2ContainerCredentialsProviderWrapper in properties file, I get the below error:
No credential providers specifiedjava com.amazonaws.services.kinesis.multilang.MultiLangDaemon

It looks like this is because the below setup.py is referencing aws-java-sdk-core 1.11.14 here
https://github.com/awslabs/amazon-kinesis-client-python/blob/master/setup.py#L66

However, I see that EC2ContainerCredentialsProviderWrapper is release in 1.11.16 onwards only.

Can you please update https://github.com/awslabs/amazon-kinesis-client-python/blob/master/setup.py to use latest version of aws-java-sdk-* jars?

create a boto CredentialsProvider

There are four extant CredentialsProvider classes in java, but none of them load boto files. Since boto is the de-facto way to use python, there should be a way to use boto files with this project (regardless of what's happening under the hood).

KCL not doing what I'm expecting it to do

I'm running the sample code.

I'm running

sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster

and I get some words pushed

Connecting to stream: words in us-east-1
Put word: cat into stream: words
Put word: dog into stream: words
Put word: bird into stream: words
Put word: lobster into stream: words

but when I run python sample_kclpy_app.py, I don't have any output.

I'm expecting python sample_kclpy_app.py to ouput "getting word: cat..." or something along those lines.

Am I missing something? It looks like the stream is not being listened to.

Stuck on Sleeping.... Stopping: Reading next message from STDIN

We got an error today and we are not sure how to handle it.

The app after running fine for several hours across multiple shards threw an exception and got stuck in a 'Sleeping' state even though there are records to be processed for that shard.

How can we intercept this exception and handle it within our python app? It never got to process_records. After manually killing the task and restarting, everything went back to normal.

Jun 05, 2018 5:15:24 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000125
Jun 05, 2018 5:15:24 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Stopping: Reading next message from STDIN for shardId-000000000125
Jun 05, 2018 5:15:24 PM com.amazonaws.services.kinesis.multilang.MultiLangProtocol waitForStatusMessage
SEVERE: Failed to get status message for processRecords action for shard shardId-000000000125
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000125 so won't be able to return a message.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:165)
at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:133)
at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.processRecords(MultiLangProtocol.java:87)
at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.processRecords(MultiLangRecordProcessor.java:105)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:215)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:170)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000125 so won't be able to return a message.
at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:84)
at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:31)
at com.amazonaws.services.kinesis.multilang.LineReaderTask.call(LineReaderTask.java:70)
... 4 more
Jun 05, 2018 5:15:31 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000125
Jun 05, 2018 5:15:31 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Jun 05, 2018 5:16:32 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000125
Jun 05, 2018 5:16:32 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
....

Thanks in advance!

Documentation is outdated

This refers to the v1 RecordProcessor format but it seems like the v2 is the new standard. The README in the repo should also be updated.

Not able to run the sample app

I am running the sample app using this command
amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties samples/sample.properties

And the wordputter app provided along with samples.

I am able to successfully run the wordputter but not able to understand if the sample app is actually fetching the data or not?

Only changes which I have made in the sample app are
initialPositionInStream = LATEST in sample.properties file and adding two print statements in both the record processor functions.

The biggest challenge I am facing is there are no logs getting generated (Or may be they are but I just don't know where to look for them), which is making it even difficult to debug. Any sort of help is appreciated.

Sleeping without processing records

Hey there! I've recently started to use this Amazon Kinesis Client Library for Python. The issue I'm facing is that when starting the consumer application, it starts to sleep for some time (from minutes to a couple of hours) before processing the records in the stream.

Detailed logs
May 05, 2017 8:15:53 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator getConfiguration
INFO: Value of workerId is not provided in the properties. WorkerId is automatically assigned as: af6ba3a2-c543-4a10-86a3-8088faaa332b
May 05, 2017 8:15:53 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property initialPositionInStream with value TRIM_HORIZON
May 05, 2017 8:15:53 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig buildExecutorService
INFO: Using a cached thread pool.
May 05, 2017 8:15:53 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig <init>
INFO: Running oplog-client-beta to process stream mongo_oplog with executable ./main.py
May 05, 2017 8:15:53 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using workerId: af6ba3a2-c543-4a10-86a3-8088faaa332b
May 05, 2017 8:15:53 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using credentials with access key id: ****************
May 05, 2017 8:15:53 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: MultiLangDaemon is adding the following fields to the User Agent: amazon-kinesis-client-library-java-1.7.4 amazon-kinesis-multi-lang-daemon/1.0.1 python/2.7 ./main.py
May 05, 2017 8:15:53 AM com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator <init>
INFO: With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
May 05, 2017 8:15:53 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initialization attempt 1
May 05, 2017 8:15:53 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initializing LeaseCoordinator
May 05, 2017 8:15:54 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Syncing Kinesis shard info
May 05, 2017 8:15:55 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Starting LeaseCoordinator
May 05, 2017 8:15:55 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
INFO: Worker af6ba3a2-c543-4a10-86a3-8088faaa332b needed 1 leases but none were expired, so it will steal lease shardId-000000000007 from 3f1f8403-de5b-418d-b1d0-bbf2b1f3f6f2
May 05, 2017 8:15:55 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
INFO: Worker af6ba3a2-c543-4a10-86a3-8088faaa332b saw 2 total leases, 0 available leases, 2 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
May 05, 2017 8:15:55 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker takeLeases
INFO: Worker af6ba3a2-c543-4a10-86a3-8088faaa332b successfully took 1 leases: shardId-000000000007
May 05, 2017 8:16:05 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker run
INFO: Initialization complete. Starting worker loop.
May 05, 2017 8:16:05 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker infoForce
INFO: Created new shardConsumer for : ShardInfo [shardId=shardId-000000000007, concurrencyToken=e08ce398-0212-4cf7-a381-15786e890fa9, parentShardIds=[shardId-000000000003, shardId-000000000004], checkpoint={SequenceNumber: 49571726813165911595511780746709614940171352450980118642,SubsequenceNumber: 0}]
May 05, 2017 8:16:05 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No lease found for shard shardId-000000000003. Not blocking on completion of this shard.
May 05, 2017 8:16:06 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No lease found for shard shardId-000000000004. Not blocking on completion of this shard.
May 05, 2017 8:16:06 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No need to block on parents [shardId-000000000003, shardId-000000000004] of shard shardId-000000000007
May 05, 2017 8:16:06 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
INFO: Initializing shard shardId-000000000007 with 49571726813165911595511780746709614940171352450980118642
May 05, 2017 8:16:07 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing InitializeMessage to child process for shard shardId-000000000007
May 05, 2017 8:16:07 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading STDERR for shardId-000000000007
May 05, 2017 8:16:07 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 154 bytes for shard shardId-000000000007
May 05, 2017 8:16:07 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000007
May 05, 2017 8:16:07 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol validateStatusMessage
INFO: Received response {"action":"status","responseFor":"initialize"} from subprocess while waiting for initialize while processing shard shardId-000000000007
May 05, 2017 8:16:16 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
INFO: Worker af6ba3a2-c543-4a10-86a3-8088faaa332b saw 2 total leases, 1 available leases, 1 workers. Target is 2 leases, I have 1 leases, I will take 1 leases
May 05, 2017 8:16:16 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker takeLeases
INFO: Worker af6ba3a2-c543-4a10-86a3-8088faaa332b successfully took 1 leases: shardId-000000000008
May 05, 2017 8:16:16 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker infoForce
INFO: Created new shardConsumer for : ShardInfo [shardId=shardId-000000000008, concurrencyToken=0f0ac246-98fa-4c72-8dba-1d52e4d33eec, parentShardIds=[shardId-000000000005, shardId-000000000006], checkpoint={SequenceNumber: 49571726824249381959181500328440537403186558546802114690,SubsequenceNumber: 0}]
May 05, 2017 8:16:16 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No lease found for shard shardId-000000000005. Not blocking on completion of this shard.
May 05, 2017 8:16:17 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No lease found for shard shardId-000000000006. Not blocking on completion of this shard.
May 05, 2017 8:16:17 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No need to block on parents [shardId-000000000005, shardId-000000000006] of shard shardId-000000000008
May 05, 2017 8:16:17 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
INFO: Initializing shard shardId-000000000008 with 49571726824249381959181500328440537403186558546802114690
May 05, 2017 8:16:18 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading STDERR for shardId-000000000008
May 05, 2017 8:16:18 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing InitializeMessage to child process for shard shardId-000000000008
May 05, 2017 8:16:18 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000008
May 05, 2017 8:16:18 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 154 bytes for shard shardId-000000000008
May 05, 2017 8:16:18 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol validateStatusMessage
INFO: Received response {"action":"status","responseFor":"initialize"} from subprocess while waiting for initialize while processing shard shardId-000000000008
May 05, 2017 8:16:54 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000007, shardId-000000000008
May 05, 2017 8:16:54 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
May 05, 2017 8:17:56 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000007, shardId-000000000008
May 05, 2017 8:17:56 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
May 05, 2017 8:18:57 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000007, shardId-000000000008
May 05, 2017 8:18:57 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...

Would love to know what may be causing this and what can I do to solve it. Thanks!

Recommended way of debugging a custom RecordProcessor

Looking at RecordProcessor::process_records

def process_records(self, process_records_input):
    ...
    try:
        for record in process_records_input.records:
            ...

    except Exception as e:
        sys.stderr.write("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

I see that any exception raised in the try block will be written to stderr. However, the stderr buffer is not flushed immediately. In consequence, error messages are not displayed in the MultiLangDaemon's stdout, making debugging harder.

In your opinion, what would be the best solution to this problem? Here are some ideas:

  • disable output buffering (executableName = python -u myscript.py)
  • modify RecordProcessor so that it uses amazon_kclpy.kcl._IOHandler, e.g.:
def process_records(self, process_records_input):
    ...
    try:
        for record in process_records_input.records:
            ...

    except Exception as e:
        # self.iohandler would have to be injected somewhere
        self.iohandler.write_error("Encountered an exception while processing records. Exception was {e}\n".format(e=e))
  • handle exceptions myself from MyCustomRecordProcessor at the process_record level:
logger = logging.getLogger()
class MyCustomRecordProcessor(RecordProcessor):
    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        try:
            # some exception is raised here
        except as e:
            logger.error("Something wrong happened: {e}".format(e=e))

"Attempting to checkpoint at sequence number" repeated forever

Hi, first of all, thanks a lot for this project, much appreciated!

Unfortunately, I'm having hard times with checkpointing -- it seems that the daemon is keeping replying to all previous checkpointing requests, and I don't know why. I start the sample application, init goes fine, then the first ProcessRecordsMessage is sent, the client starts to process data and first checkpoint request happens, eg:

Sep 16, 2016 1:49:53 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol checkpoint
INFO: Attempting to checkpoint at sequence number 49565688898378909600883113540136141255235645706373955586 for shard shardId-000000000000

The problem is that the very same "attempring to checkpoint" line with the same sequence number shows up after all future ProcessRecordsMessage, eg after a few iterations:

Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing ProcessRecordsMessage to child process for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 24749 bytes for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol checkpoint
INFO: Attempting to checkpoint at sequence number 49565688898378909600883113547960309159781526687145000962 for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing CheckpointMessage to child process for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 130 bytes for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol checkpoint
INFO: Attempting to checkpoint at sequence number 49565688898378909600883113556615009102402658048378142722 for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing CheckpointMessage to child process for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 130 bytes for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol checkpoint
INFO: Attempting to checkpoint at sequence number 49565688898378909600883113540136141255235645706373955586 for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing CheckpointMessage to child process for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 130 bytes for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol checkpoint
INFO: Attempting to checkpoint at sequence number 49565688898378909600883113561025170492356826033620516866 for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing CheckpointMessage to child process for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 108 bytes for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol checkpoint
INFO: Attempting to checkpoint at sequence number 49565688898378909600883113552833489138648097371421933570 for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing CheckpointMessage to child process for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 130 bytes for shard shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Sep 16, 2016 1:50:22 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000

Other than the extremely heavy logging, it seems to work fine, the checkpoint field is updated frequently in DynamoDB -- but I have no idea why do we have so many checkpointing requests (most of those fail, as the most recent cehckpointing request included a more recent sequence number). Any ideas?

Persist State Across App Shutdowns/Credentials Store

2 Questions:

  1. How can I persist the last processed sequence id, should I just store and initialize a KCL with that sequence id, as part of the initialize and shutdown methods? I am looking to be able to shutdown the KCL app and then restart it and it pick up where it left off. I know while the app is running it persists in DynamoDb but would be great to also persist across shutdowns?

  2. Is it possible to programatically get the credentials. We use a special secret vault, I would love a way to pass credentials in during the initialize or startup method versus having to hard code them into the environment. Another option I thought about is to wrap the KCL library and programatically add them to the environment before calling kcl.KCLProcess()

java.io.IOException: error=2, No such file or directory

@knorwood - Firstly, thanks for getting this out! Really great stuff.

I've hit a small issue in that java can't find my program:

java.io.IOException: Cannot run program "consumers.py": error=2, No such file or directory

I've set the executableName = consumers.py in my properties file. Is that path relative to dir in which I'm running the command? Or is absolute? (I've tried both, with no luck).

Also, I can't seem to find much on the recommended way to daemonize the process?

Thanks!

Failed building wheel for amazon-kclpy

I'm trying to install this module but it fails when building wheel

  Running setup.py bdist_wheel for amazon-kclpy ... error
  Complete output from command /Users/khashayar/Documents/Projects/Work/Terrene/TerreneCore/env/bin/python3.5 -u -c "import setuptools, tokenize;__file__='/private/var/folders/g4/gbyhwndn1sgbqgpnx0d9sb780000gn/T/pip-build-xn64fd3j/amazon-kclpy/setup.py';exec(compile(getattr(tokenize, 'open', open)(__file__).read().replace('\r\n', '\n'), __file__, 'exec'))" bdist_wheel -d /var/folders/g4/gbyhwndn1sgbqgpnx0d9sb780000gn/T/tmpioptd5o2pip-wheel- --python-tag cp35:
  running bdist_wheel
  running build
  running build_py
  creating build
  creating build/lib
  creating build/lib/amazon_kclpy
  copying amazon_kclpy/__init__.py -> build/lib/amazon_kclpy
  copying amazon_kclpy/kcl.py -> build/lib/amazon_kclpy
  creating build/lib/samples
  copying samples/__init__.py -> build/lib/samples
  copying samples/amazon_kclpy_helper.py -> build/lib/samples
  copying samples/sample_kclpy_app.py -> build/lib/samples
  copying samples/sample_kinesis_wordputter.py -> build/lib/samples
  creating build/lib/amazon_kclpy/jars
  copying amazon_kclpy/jars/__init__.py -> build/lib/amazon_kclpy/jars
  copying samples/sample.properties -> build/lib/samples
  running build_scripts
  creating build/scripts-3.5
  copying samples/__init__.py -> build/scripts-3.5
  copying and adjusting samples/amazon_kclpy_helper.py -> build/scripts-3.5
  copying and adjusting samples/sample_kclpy_app.py -> build/scripts-3.5
  copying and adjusting samples/sample_kinesis_wordputter.py -> build/scripts-3.5
  changing mode of build/scripts-3.5/__init__.py from 644 to 755
  changing mode of build/scripts-3.5/amazon_kclpy_helper.py from 644 to 755
  changing mode of build/scripts-3.5/sample_kclpy_app.py from 644 to 755
  changing mode of build/scripts-3.5/sample_kinesis_wordputter.py from 644 to 755
  installing to build/bdist.macosx-10.12-x86_64/wheel
  running install
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/amazon-kinesis-client/1.6.4/amazon-kinesis-client-1.6.4.jar
  Saving http://search.maven.org/remotecontent?filepath=com/amazonaws/amazon-kinesis-client/1.6.4/amazon-kinesis-client-1.6.4.jar -> amazon_kclpy/jars/amazon-kinesis-client-1.6.4.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-cloudwatch/1.11.14/aws-java-sdk-cloudwatch-1.11.14.jar
  Saving http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-cloudwatch/1.11.14/aws-java-sdk-cloudwatch-1.11.14.jar -> amazon_kclpy/jars/aws-java-sdk-cloudwatch-1.11.14.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-core/1.11.14/aws-java-sdk-core-1.11.14.jar
  Saving http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-core/1.11.14/aws-java-sdk-core-1.11.14.jar -> amazon_kclpy/jars/aws-java-sdk-core-1.11.14.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-dynamodb/1.11.14/aws-java-sdk-dynamodb-1.11.14.jar
  Saving http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-dynamodb/1.11.14/aws-java-sdk-dynamodb-1.11.14.jar -> amazon_kclpy/jars/aws-java-sdk-dynamodb-1.11.14.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-kinesis/1.11.14/aws-java-sdk-kinesis-1.11.14.jar
  Saving http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-kinesis/1.11.14/aws-java-sdk-kinesis-1.11.14.jar -> amazon_kclpy/jars/aws-java-sdk-kinesis-1.11.14.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-kms/1.11.14/aws-java-sdk-kms-1.11.14.jar
  Saving http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-kms/1.11.14/aws-java-sdk-kms-1.11.14.jar -> amazon_kclpy/jars/aws-java-sdk-kms-1.11.14.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-s3/1.11.14/aws-java-sdk-s3-1.11.14.jar
  Saving http://search.maven.org/remotecontent?filepath=com/amazonaws/aws-java-sdk-s3/1.11.14/aws-java-sdk-s3-1.11.14.jar -> amazon_kclpy/jars/aws-java-sdk-s3-1.11.14.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/fasterxml/jackson/core/jackson-annotations/2.6.0/jackson-annotations-2.6.0.jar
  Saving http://search.maven.org/remotecontent?filepath=com/fasterxml/jackson/core/jackson-annotations/2.6.0/jackson-annotations-2.6.0.jar -> amazon_kclpy/jars/jackson-annotations-2.6.0.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/fasterxml/jackson/core/jackson-core/2.6.6/jackson-core-2.6.6.jar
  Saving http://search.maven.org/remotecontent?filepath=com/fasterxml/jackson/core/jackson-core/2.6.6/jackson-core-2.6.6.jar -> amazon_kclpy/jars/jackson-core-2.6.6.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/fasterxml/jackson/core/jackson-databind/2.6.6/jackson-databind-2.6.6.jar
  Saving http://search.maven.org/remotecontent?filepath=com/fasterxml/jackson/core/jackson-databind/2.6.6/jackson-databind-2.6.6.jar -> amazon_kclpy/jars/jackson-databind-2.6.6.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/fasterxml/jackson/dataformat/jackson-dataformat-cbor/2.6.6/jackson-dataformat-cbor-2.6.6.jar
  Saving http://search.maven.org/remotecontent?filepath=com/fasterxml/jackson/dataformat/jackson-dataformat-cbor/2.6.6/jackson-dataformat-cbor-2.6.6.jar -> amazon_kclpy/jars/jackson-dataformat-cbor-2.6.6.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/google/guava/guava/18.0/guava-18.0.jar
  Saving http://search.maven.org/remotecontent?filepath=com/google/guava/guava/18.0/guava-18.0.jar -> amazon_kclpy/jars/guava-18.0.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=com/google/protobuf/protobuf-java/2.6.1/protobuf-java-2.6.1.jar
  Saving http://search.maven.org/remotecontent?filepath=com/google/protobuf/protobuf-java/2.6.1/protobuf-java-2.6.1.jar -> amazon_kclpy/jars/protobuf-java-2.6.1.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=commons-codec/commons-codec/1.9/commons-codec-1.9.jar
  Saving http://search.maven.org/remotecontent?filepath=commons-codec/commons-codec/1.9/commons-codec-1.9.jar -> amazon_kclpy/jars/commons-codec-1.9.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=commons-lang/commons-lang/2.6/commons-lang-2.6.jar
  Saving http://search.maven.org/remotecontent?filepath=commons-lang/commons-lang/2.6/commons-lang-2.6.jar -> amazon_kclpy/jars/commons-lang-2.6.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar
  Saving http://search.maven.org/remotecontent?filepath=commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar -> amazon_kclpy/jars/commons-logging-1.1.3.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
  Saving http://search.maven.org/remotecontent?filepath=joda-time/joda-time/2.8.1/joda-time-2.8.1.jar -> amazon_kclpy/jars/joda-time-2.8.1.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=org/apache/httpcomponents/httpclient/4.5.2/httpclient-4.5.2.jar
  Saving http://search.maven.org/remotecontent?filepath=org/apache/httpcomponents/httpclient/4.5.2/httpclient-4.5.2.jar -> amazon_kclpy/jars/httpclient-4.5.2.jar
  Attempting to retrieve remote jar http://search.maven.org/remotecontent?filepath=org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar
  Saving http://search.maven.org/remotecontent?filepath=org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar -> amazon_kclpy/jars/httpcore-4.4.4.jar
  Checking .pth file support in build/bdist.macosx-10.12-x86_64/wheel/
  /Users/khashayar/Documents/Projects/Work/Terrene/TerreneCore/env/bin/python3.5 -E -c pass
  TEST FAILED: build/bdist.macosx-10.12-x86_64/wheel/ does NOT support .pth files
  error: bad install directory or PYTHONPATH

  You are attempting to install a package to a directory that is not
  on PYTHONPATH and which Python does not read ".pth" files from.  The
  installation directory you specified (via --install-dir, --prefix, or
  the distutils default setting) was:

      build/bdist.macosx-10.12-x86_64/wheel/

  and your PYTHONPATH environment variable currently contains:

      ''

  Here are some of your options for correcting the problem:

  * You can choose a different installation directory, i.e., one that is
    on PYTHONPATH or supports .pth files

  * You can add the installation directory to the PYTHONPATH environment
    variable.  (It must then also be on PYTHONPATH whenever you run
    Python and want to use the package(s) you are installing.)

  * You can set up the installation directory to support ".pth" files by
    using one of the approaches described here:

    https://setuptools.readthedocs.io/en/latest/easy_install.html#custom-installation-locations


  Please make the appropriate changes for your system and try again.


Python version: 3.5.2
OS X: 10.12 (also tried on 10.11, same error)

KinesisClientLibIOException Shard [shardId-000000000000] is not closed

Getting this error with Kinesalite. It was working fine the first time I ran the kcl python app, but once I tried to delete local kinesalite stream and local dynamodb table, i start getting this error below ever since. please help!

严重: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException: Shard [shardId-000000000000] is not closed. This can happen if we constructed the list of shards while a reshard operation was in progress.
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.assertClosedShardsAreCoveredOrAbsent(ShardSyncer.java:206)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.cleanupLeasesOfFinishedShards(ShardSyncer.java:652)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.syncShardLeases(ShardSyncer.java:141)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.checkAndCreateLeasesForNewShards(ShardSyncer.java:88)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask.call(ShardSyncTask.java:68)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:427)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:356)
at com.amazonaws.services.kinesis.multilang.MultiLangDaemon.call(MultiLangDaemon.java:114)
at com.amazonaws.services.kinesis.multilang.MultiLangDaemon.call(MultiLangDaemon.java:61)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1152)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
at java.lang.Thread.run(Thread.java:748)

Cannot run Kinesis apps due to "Reached end of STDIN of child process for shard"

Hi,

We are developing a Kinesis app with this python library. However, during developing, we encounter an exception java.util.concurrent.ExecutionException: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message. which stops our Kinesis app. The python script we used is good because it works before, and we don't understand the meaning of this error message. Any idea of this error message?

Our EC2 instance runs three Kinesis apps by the following command. The apps work on different Kinesis stream, and only one of the apps has this problem.

$(./amazon_kclpy_helper.py -j $(which java) -p app_1.properties --print_command)
$(./amazon_kclpy_helper.py -j $(which java) -p app_2.properties --print_command)
$(./amazon_kclpy_helper.py -j $(which java) -p app_3.properties --print_command)

The following is console output for broken Kinesis apps.

Dec 11, 2014 10:07:26 AM com.amazonaws.services.kinesis.multilang.MultiLangProtocol waitForStatusMessage
SEVERE: Failed to get status message for initialize action for shard shardId-000000000000
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message.
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:141)
        at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:116)
        at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.initialize(MultiLangProtocol.java:70)
        at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.initialize(MultiLangRecordProcessor.java:131)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:74)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message.
        at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:84)
        at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:31)
        at com.amazonaws.services.kinesis.multilang.LineReaderTask.call(LineReaderTask.java:70)
        ... 4 more

Dec 11, 2014 10:07:26 AM com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor stopProcessing
SEVERE: Encountered an error while trying to initialize record processor
java.lang.RuntimeException: Failed to initialize child process
        at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.initialize(MultiLangRecordProcessor.java:132)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:74)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Dec 11, 2014 10:07:26 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Stopping: Reading STDERR for shardId-000000000000

Problem trying to refresh assume-role credentials automatically with KCL on EC2

I have a Kinesis Stream Reader setup on an EC2 instance with the an instance profile with assume-role permissions to assume a reader-role to a Kinesis Stream in a separate AWS account. I can access the stream perfectly fine if I configure my credentials by using the STS service through the AWS-CLI, retrieve temporary credentials and setup the ~/.aws/credentials file accordingly. But, I'm running into problems if I try to use the STSAssumeRoleSessionCredentialsProvider in kcl.properties to automatically refresh the temporary credentials.

The credentials in kcl.properties is as follows:

AWSCredentialsProvider = STSAssumeRoleSessionCredentialsProvider|arn:aws:iam::<account-id>:role/<role-name>|session-name

But I seem to be running into the following issue.

No credential providers specifiedjava com.amazonaws.services.kinesis.multilang.MultiLangDaemon <properties file>

I'm using the Amazon KCL Version : 1.4.4 with the following JARS:

  • amazon-kinesis-client-1.7.5.jar
  • aws-java-sdk-cloudwatch-1.11.115.jar
  • aws-java-sdk-core-1.11.115.jar
  • aws-java-sdk-dynamodb-1.11.115.jar
  • aws-java-sdk-kinesis-1.11.115.jar
  • aws-java-sdk-kms-1.11.115.jar
  • aws-java-sdk-s3-1.11.115.jar
  • commons-codec-1.9.jar
  • commons-lang-2.6.jar
  • commons-logging-1.1.3.jar
  • guava-18.0.jar
  • httpclient-4.5.2.jar
  • httpcore-4.4.4.jar
  • ion-java-1.0.2.jar
  • jackson-annotations-2.6.0.jar
  • jackson-core-2.6.6.jar
  • jackson-databind-2.6.6.jar
  • jackson-dataformat-cbor-2.6.6.jar
  • jmespath-java-1.11.115.jar
  • joda-time-2.8.1.jar
  • protobuf-java-2.6.1.jar

Any help with respect to missing JARs or configuration specifics would be helpful.

Running in containers causes problems due to stdout conflicts

Hi there,

I'm running this KCL in a Docker container, however due to the fact that my code writes log entries (to stdout to make docker log aggregation easier) it conflicts with KCL entries and displays unnecessary errors and in some cases causes the KCL to crash.

I'll get output like this coming through:

SEVERE: Received error line from subprocess [[2017-09-13 02:16:10.104][scheduler][INFO]: Deduplicated stream items - original length: 1, filtered length: 1] for shard shardId-000000000000

Is there any way to get around this other than resorting to logging to separate files? Ideally it would be great if I could configure it to use a custom unix socket (similar to how say php-fpm or nginx-uwsgi work) so that it doesn't conflict with services logging to stdout / stderr

initialPositionInStream AT_TIMESTAMP

Is there a way to setup initialPositionInStream to AT_TIMESTAMP? Both TRIM_HORIZON and LATEST return some values when it first starts even if it has been processed already...
So I was hoping I could set AT_TIMESTAMP to now and get any new messages after that...

Upgrade to latest KCL?

Python client currently using KCL 1.7.5, there have been many versions of KCL released since then, can we get a new version of this soon that catches us up (latest is 1.8.5 at the moment I'm writing this)?

Sample helper script doesn't work on Windows

Actually, the helper script generates command for UNIX system. In UNIX, jars in classpath are separated by colons however for Windows it has to be semi-colon.

So, in amazon_kclpy_helper.py changed return ':'.join(... to return ';'.join(.... There were two places that needs to be changed.

Also in sample.properties file set executableName = python my_kclpy_app.py

how to use initialize_input parameter

I am using KCL for python to develop my consumer code. However, from the template provided by KCL, the initialize method has a initialize_input parameter which is not be used. From the document, this is a variable indicates which shard the consumer will read. Is it an integer variable or a string, I tried both 1 and "shardId-000000000001" However, the consumer is still reading records from all the shards from the log. Is there any tutorials or example about how to let consumer only consume data from one shard?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.