Coder Social home page Coder Social logo

amazon-kinesis-connectors's Introduction

Amazon Kinesis Connector Library

The Amazon Kinesis Connector Library helps Java developers integrate Amazon Kinesis with other AWS and non-AWS services. The current version of the library provides connectors for Amazon DynamoDB, Amazon Redshift, Amazon S3, Elasticsearch. The library also includes sample connectors of each type, plus Apache Ant build files for running the samples.

Requirements

  • Amazon Kinesis Client Library: In order to use the Amazon Kinesis Connector Library, you'll also need the Amazon Kinesis Client Library.
  • Java 1.7: The Amazon Kinesis Client Library requires Java 1.7 (Java SE 7) or later.
  • Elasticsearch 1.2.1: The Elasticsearch connector depends on Elasticsearch 1.2.1.
  • SQL driver (Amazon Redshift only): If you're using an Amazon Redshift connector, you'll need a driver that will allow your SQL client to connect to your Amazon Redshift cluster. For more information, see Download the Client Tools and the Drivers in the Amazon Redshift Getting Started Guide.

Overview

Each Amazon Kinesis connector application is a pipeline that determines how records from an Amazon Kinesis stream will be handled. Records are retrieved from the stream, transformed according to a user-defined data model, buffered for batch processing, and then emitted to the appropriate AWS service.

A connector pipeline uses the following interfaces:

  • IKinesisConnectorPipeline: The pipeline implementation itself.
  • ITransformer: Defines the transformation of records from the Amazon Kinesis stream in order to suit the user-defined data model. Includes methods for custom serializer/deserializers.
  • IFilter: IFilter defines a method for excluding irrelevant records from the processing.
  • IBuffer: IBuffer defines a system for batching the set of records to be processed. The application can specify three thresholds: number of records, total byte count, and time. When one of these thresholds is crossed, the buffer is flushed and the data is emitted to the destination.
  • IEmitter: Defines a method that makes client calls to other AWS services and persists the records stored in the buffer. The records can also be sent to another Amazon Kinesis stream.

Each connector depends on the implementation of KinesisConnectorRecordProcessor to manage the pipeline. The KinesisConnectorRecordProcessor class implements the IRecordProcessor interface in the Amazon Kinesis Client Library.

Implementation Highlights

The library includes implementations for use with Amazon DynamoDB, Amazon Redshift, Amazon S3, and Elasticsearch. This section provides a few notes about each connector type. For full details, see the samples and the Javadoc.

kinesis.connectors.dynamodb

  • DynamoDBTransformer: Implement the fromClass method to map your data model to a format that's compatible with the AmazonDynamoDB client (Map<String,AttributeValue>).
  • For more information on Amazon DynamoDB formats and putting items, see Working with Items Using the AWS SDK for Java Low-Level API in the Amazon DynamoDB Developer Guide.

kinesis.connectors.redshift

  • RedshiftTransformer: Implement the toDelimitedString method to output a delimited-string representation of your data model. The string must be compatible with an Amazon Redshift COPY command.
  • For more information about Amazon Redshift copy operations and manifests, see COPY and Using a manifest to specify data files in the Amazon Redshift Developer Guide.

kinesis.connectors.s3

  • S3Emitter: This class writes the buffer contents to a single file in Amazon S3. The file name is determined by the Amazon Kinesis sequence numbers of the first and last records in the buffer. For more information about sequence numbers, see Add Data to a Stream in the Amazon Kinesis Developer Guide.

kinesis.connectors.elasticsearch

  • KinesisMessageModelElasticsearchTransformer: This class provides an implementation for fromClass by transforming the record into JSON format and setting the index, type, and id to use for Elasticsearch.
  • BatchedKinesisMessageModelElasticsearchTransformer: This class extends KinesisMessageModelElasticsearchTransformer. If you batch events before putting data into Kinesis, this class will help you unpack the events before loading them into Elasticsearch.

Configuration

Set the following variables (common to all connector types) in kinesis.connectors.KinesisConnectorConfiguration:

  • AWSCredentialsProvider: Specify the implementation of AWSCredentialsProvider that supplies your AWS credentials.
  • APP_NAME: The Amazon Kinesis application name (not the connector application name) for use with kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration. For more information, see Developing Record Consumer Applications in the Amazon Kinesis Developer Guide.
  • KINESIS_ENDPOINT and KINESIS_INPUT_STREAM: The endpoint and name of the Kinesis stream that contains the data you're connecting to other AWS services.

Service-specific configuration variables are set in the respective emitter implementations (e.g., kinesis.connectors.dynamodb.DynamoDBEmitter).

Samples

The samples folder contains common classes for all the samples. The subfolders contain implementations of the pipeline and executor classes, along with Apache Ant build.xml files for running the samples.

Each sample uses the following files:

  • StreamSource.java: A simple application that sends records to an Amazon Kinesis stream.
  • users.txt: JSON records that are parsed line by line by the StreamSource program; the basis of KinesisMessageModel.
  • KinesisMessageModel.java: The data model for the users.txt records.
  • KinesisConnectorExecutor.java: An abstract implementation of an Amazon Kinesis connector application, which includes these features:
    • Configures the constructor, using the samples.utils package and the .properties file in the sample subfolder.
    • Provides the getKinesisConnectorRecordProcessorFactory() method, which is implemented by the executors in the sample subfolders; each executor returns an instance of a factory configured with the appropriate pipeline.
    • Provides a run() method for spawning a worker thread that uses the result of getKinesisConnectorRecordProcessorFactory().
  • .properties: The service-specific key-value properties for configuring the connector.
  • <service/type>Pipeline: The implementation of IKinesisConnectorPipeline for the sample. Each pipeline class returns a service-specific transformer and emitter, as well as simple buffer and filter implementations (BasicMemoryBuffer and AllPassFilter).

Running a Sample

To run a sample, complete these steps:

  1. Edit the *.properties file, adding your AWS credentials and any necessary AWS resource configurations.
  2. Confirm that the required AWS resources exist, or set the flags in the *.properties file to indicate that resources should be created when the sample is run.
  3. Build the samples using Maven
    cd samples
    mvn package
    
  4. Scripts to start each of the samples will be available in target/appassembler/bin

Release Notes

Release 1.3.0 (November 17, 2016)

  • Upgraded the Amazon Kinesis Client Library to version 1.7.2.
  • Upgraded the AWS Java SDK to 1.11.14.
  • Migrated the sample to now use Maven for dependency management, and execution.
  • Maven Artifact Signing Change

Release 1.2.0 (June 23, 2015)

  • Upgraded KCL to 1.4.0
  • Added pipelined record processor that decouples Amazon Kinesis GetRecords() and IRecordProcessor's ProcessRecords() API calls for efficiency.

Release 1.1.2 (May 27, 2015)

  • Upgraded AWS SDK to 1.9, KCL to 1.3.0
  • Added pom.xml file

Release 1.1.1 (Sep 11, 2014)

  • Added connector to Elasticsearch

Release 1.1 (June 30, 2014)

  • Added time threshold to IBuffer
  • Added region name support

Related Resources

Amazon Kinesis Developer Guide
Amazon Kinesis API Reference

Amazon DynamoDB Developer Guide
Amazon DynamoDB API Reference

Amazon Redshift Documentation

Amazon S3 Documentation and Videos

Elasticsearch

License

This library is licensed under the Apache 2.0 License.

amazon-kinesis-connectors's People

Contributors

afitzgibbon avatar alexconlin avatar cory-bradshaw avatar dependabot[bot] avatar gauravgh avatar jamesiri avatar jawsthegame avatar jganoff avatar manango avatar pfifer avatar rmahfoud avatar tkawachi avatar wanis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amazon-kinesis-connectors's Issues

Getting below error when i try to run dynamodb sample.

Getting below error when i try to run dynamodb sample. Thanks in advance.

ava] INFO: Created new shardConsumer for shardId: shardId-000000000000, concurrencyToken: 800bb2cc-1a82-46b9-8ec8-f5d6287c9a69
[java] Nov 25, 2014 4:18:13 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
[java] INFO: No need to block on parents [] of shard shardId-000000000000
[java] Nov 25, 2014 4:18:13 PM com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable publishMetrics
[java] INFO: Successfully published 20 datums.
[java] Nov 25, 2014 4:18:14 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
[java] INFO: Initializing shard shardId-000000000000 with TRIM_HORIZON
[java] Nov 25, 2014 4:18:14 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
[java] INFO: Initializing shard shardId-000000000001 with TRIM_HORIZON
[java] Nov 25, 2014 4:18:16 PM com.amazonaws.services.kinesis.connectors.dynamodb.DynamoDBEmitter performBatchRequest
[java] SEVERE: Amazon DynamoDB Client could not perform batch request
[java] com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: PCQTJ5602VPM99LCCU7EHGUUFFVV4KQNSO5AEMVJF66Q9ASUAAJG)
[java] at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1077)
[java] at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:725)
[java] at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460)
[java] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295)
[java] at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3106)
[java] at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.batchWriteItem(AmazonDynamoDBClient.java:771)
[java] at com.amazonaws.services.kinesis.connectors.dynamodb.DynamoDBEmitter.performBatchRequest(Unknown Source)
[java] at com.amazonaws.services.kinesis.connectors.dynamodb.DynamoDBEmitter.emit(Unknown Source)
[java] at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.emit(Unknown Source)
[java] at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.processRecords(Unknown Source)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:125)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
[java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
[java] at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[java] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[java] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[java] at java.lang.Thread.run(Thread.java:744)

GetShardIterator... invalid because it did not come from this stream

When running ant run for the redshiftmanifest sample project I'm seeing the following error:

[java] com.amazonaws.services.kinesis.model.InvalidArgumentException: Status Code: 400, AWS Service: AmazonKinesis, AWS Request ID: ba19e440-b6bd-11e3-a5c0-c73f0bed95c2, AWS Error Code: InvalidArgumentException, AWS Error Message:
StartingSequenceNumber 49537979178794861780081237450294896636708704732336619521 used in GetShardIterator on shard shardId-000000000000 in stream primaryManifestStream under account xxxxxxxxxxx is invalid because it did not come from this stream.

Here is more output from the terminal:

[java] INFO: Initialization complete. Starting worker loop.
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
[java] INFO: Worker cdbc30f111b4fbcd:-683eef15:1450a893ea4:-8000 needed 1 leases but none were expired, so it will steal lease shardId-000000000000 from b8c3c4f597fd6f29:4c01183b:1450a7aca8c:-8000
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
[java] INFO: Worker cdbc30f111b4fbcd:-683eef15:1450a893ea4:-8000 saw 2 total leases, 0 available leases, 2 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.leases.impl.LeaseTaker takeLeases
[java] INFO: Worker cdbc30f111b4fbcd:-683eef15:1450a893ea4:-8000 successfully took 1 leases: shardId-000000000000
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog infoForce
[java] INFO: Created new shardConsumer for shardId: shardId-000000000000, concurrencyToken: 8add86f6-1135-4511-b5a4-717bcf7ffbc8
[java] Mar 28, 2014 2:12:54 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
[java] INFO: No need to block on parents [] of shard shardId-000000000000
[java] Mar 28, 2014 2:12:55 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
[java] INFO: Initializing shard shardId-000000000000 with 49537979178794861780081237450294896636708704732336619521
[java] Mar 28, 2014 2:12:55 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask call
[java] SEVERE: Caught exception:
[java] com.amazonaws.services.kinesis.model.InvalidArgumentException: Status Code: 400, AWS Service: AmazonKinesis, AWS Request ID: ba19e440-b6bd-11e3-a5c0-c73f0bed95c2, AWS Error Code: InvalidArgumentException, AWS Error Message: StartingSequenceNumber 49537979178794861780081237450294896636708704732336619521 used in GetShardIterator on shard shardId-000000000000 in stream primaryManifestStream under account xxxxxxxxxxx is invalid because it did not come from this stream.
[java]  at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
[java]  at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
[java]  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
[java]  at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2152)
[java]  at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:406)
[java]  at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.getIterator(KinesisProxy.java:237)
[java]  at com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator.getIterator(MetricsCollectingKinesisProxyDecorator.java:121)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.getIterator(KinesisDataFetcher.java:142)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.advanceIteratorAfterSequenceNumber(KinesisDataFetcher.java:104)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.advanceIteratorAfter(KinesisDataFetcher.java:122)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.initialize(KinesisDataFetcher.java:94)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:67)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
[java]  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
[java]  at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[java]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[java]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[java]  at java.lang.Thread.run(Thread.java:744)

KinesisConnectorRecordProcessor

How to handle the case where not all records in stream have same processing method? Do we need to override the processRecords method in KinesisConnectorRecordProcessor to add custom processing?

Using JSONPaths to `COPY` Objects into RedShift from Kinesis?

Manually, I imported JSON data into Redshift by doing:

  • create Redshift cluster
  • log onto the Redshift DB via JDBC driver
  • run a create table ... command
  • run the COPY command

My copy command look something like:

copy TABLE_NAME
from PATH_TO_S3_OBJECT
credentials ...
json 'PATH_TO_S3_OBJECT_JSONPATH_FILE

My json argument pointed to a file that looked like:

{
    "jsonpaths": [
        "$.name",
        "$.phone_number"
     ]
}

Note that I used the the Copy JSON approach in order to copy JSON data into Redshift columns.

Could you please tell me how to use this library for importing JSON into Redshift?

How to scale it

Hi Team,

We are evaluating this connector library, as of now we are sending records in Kinesis at a rate of 3MP/s
and using this library we are saving data to S3.
Connector library works well when data ingestion is slow but as we increase speed, connector library continues with same speed of saving records...
Is there any way we can scale it out or auto scaling?

Following directions for running the samples results in failed build.

BUILD FAILED
/Users/aconbere/Packages/amazon/amazon-kinesis-connectors/src/main/samples/redshiftmanifest/build.xml:25: /Users/aconbere/Packages/amazon/amazon-kinesis-connectors/src/aws-java-sdk-1.6.9/third-party does not exist.

it's not clear to me exactly how the project expects those files to be present. Should I just got get the aws sdk and put it there?

Understanding `redshiftbasic`

At a high-level, what does this sample do?

I am trying to run it, but I admit that, if I understand what it did at a high-level, I think that it'd help me to run the sample.

Does it populate data from users.txt into S3, and then put the data into Kinesis? Finally, it puts the records into Redshift (from Kinesis) depending upon either a time or byte threshold?

I'm confused what it does at a high-level.

Additionally, I had modified src/main/samples/redshiftbasic/RedshiftBasicSample.properties to create a Redshift cluster + Kinesis stream.

Yet, after I run ant run, I don't see the stream or cluster created when running aws kinesis list-streams or look at AWS Management Console's Cluster view for Redshift.

Compatibility with AWS Kinesis Aggregator library?

Hello Team,

I am using this library to store data from Kinesis stream to S3.

I appreciate the overall design of the library, but then I want to aggregate some data (e.g. calculate sum of timespan group by visitor id) before uploading it to S3.

After some googling I come to find another library https://github.com/awslabs/amazon-kinesis-aggregators but I am not sure how to use them together with the connector library.

I guess I should follow the instructions under this section: https://github.com/awslabs/amazon-kinesis-aggregators#integrating-aggregators-into-existing-applications

but where should I put the aggregate(), checkpoint(), initialize(), shutdown() calls? For which class(es) should I extend to do so ?

Hibernate - Redshift.

Hi , I have an application which is developed by using Struts and hibernate. now i am trying to connect AWS Redshift. My credentials are like (in Hibernate cfg file)
1)driver_class: org.postgresql.Driver
connection.url: jdbc:postgresql://host_name.redshift.amazonaws.com:5439/user
dialect: org.hibernate.dialect.PostgreSQL82Dialect

2)driver_class: com.amazon.redshift.jdbc4.Driver
connection.url: jdbc:redshift://host_name.redshift.amazonaws.com:5439/user
dialect: org.hibernate.dialect.PostgreSQL82Dialect(using this sincedont know the dialect name here)

both the cases I am getting
Error: Could not open connection: No suitable driver found for jdbc:postgresql:

I have added redshift JDBC & postgreysql jars also. I have no idea on this can anybody help me on this .

Incorrect bufferByteSizeLimit Calculation for Batched Kinesis Records

In some cases, data is batched on the client side before sending it to Kinesis via a PutRecord request. (For instance, the data blob may be a json array containing objects that should each be handled separately. I'm doing something like this, and looking to store each object on its own line in a newline-delimited json file on S3.) The connector library supports this by including both ITransformer and ICollectionTransformer interfaces and the processRecords method in KinesisConnectorRecordProcessor contains the following:

...
if (transformer instanceof ITransformer) {
    ITransformer<T, U> singleTransformer = (ITransformer<T, U>) transformer;
    filterAndBufferRecord(singleTransformer.toClass(record), record);
} else if (transformer instanceof ICollectionTransformer) {
    ICollectionTransformer<T, U> listTransformer = (ICollectionTransformer<T, U>) transformer;
    Collection<T> transformedRecords = listTransformer.toClass(record);
    for (T transformedRecord : transformedRecords) {
        filterAndBufferRecord(transformedRecord, record);
    }
}
...

And filterAndBufferRecord is implemented as:

private void filterAndBufferRecord(T transformedRecord, Record record) {
    if (filter.keepRecord(transformedRecord)) {
        buffer.consumeRecord(transformedRecord, record.getData().array().length, record.getSequenceNumber());
    }
}

Notice that filterAndBufferRecord uses record.getData().array().length as the size of the record. So in the case of transformer being an ICollectionTransformer implementation, each sub-record is passed to buffer.consumeRecord but with the size of the whole batch instead of the individual sub-record. This means the buffer will flush too often because the size it's keeping track of is inflated.

My workaround for now is just setting bufferByteSizeLimit to Integer.MAX_VALUE, and relying on bufferRecordCountLimit to flush the buffer.

Uncertain How to Use `redshiftbasic` Exaple

I'd like to use the redshiftbasic example in order to hook up my Kinesis Stream to dump into Redshift:

  • either every N minutes
  • when the bucket fills up to N bytes
  • etc.

It looks like, per the docs, that using the redshiftbasic example is the way to go.

However, I'm not sure how to use this code.

Do I simply need to take the children of the folder, amazon-kinesis-connectors/src/main/samples/redshiftbasic? Then, I need to modify a few of the source files, such as KinesisMessageModelRedshiftTransformer.java, to match my data-set?

When I run the build script via ant, do I run it once and then the Kinesis stream is wired up to always talk to Redshift?

Or does the program run continuously? If it stops (let's say due to CNTRL + Z, do I no longer get the functionality of Kinesis dumping into Redshift?

Also - how do I compile this project? Thanks

Runtime Exceptions allow buffer to grow unbounded

Hi,

If a runtime exception happens in our emitter code (preventing buffer.clear()) it bubbles out and is swallowed in the KCL ProcessTask.java. The KCL continues feeding new records to the buffer. This allows the buffer to grow unbounded.

Then if the application recovers (before OutOfMemory), the large buffer is flushed all at once.

Would be great if there was ability to:

  1. Pause processing/respond to backpressure if we detect some issue, and resume once it goes away
  2. Have KinesisConnectorRecordProcessor.java: transformToOutput(buffer) use the buffer configuration to cap how much data is flushed. ie: only output n items, or n bytes of items at a time.

/third-party does not exist. BUILD FAILED

hadoop@ip-10-102-180-39:/mnt/streamingtest/amazon-kinesis-connectors/src/main$ ant -buildfile samples/s3/build.xml
Buildfile: /mnt/streamingtest/amazon-kinesis-connectors/src/main/samples/s3/build.xml

run:
[mkdir] Created dir: /mnt/streamingtest/amazon-kinesis-connectors/src/main/build
[javac] /mnt/streamingtest/amazon-kinesis-connectors/src/main/samples/s3/build.xml:45: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
[javac] Compiling 56 source files to /mnt/streamingtest/amazon-kinesis-connectors/src/main/build

BUILD FAILED
/mnt/streamingtest/amazon-kinesis-connectors/src/main/samples/s3/build.xml:45: /third-party does not exist.

Total time: 0 seconds

Missing Elasticsearch.template CloudFormation script for Elasticsearch sample

Would it be possible to get the "Elasticsearch.template" CloudFormation script referenced in src/main/samples/elasticsearch/ElasticsearchSample.properties added?

It is referenced as being available in the blog post announcing the Elasticsearch connector:

You can use the CloudFormation template in our sample to quickly create an Elasticsearch cluster on Amazon Elastic Compute Cloud (EC2), fully managed by Auto Scaling.

Emitters should be more extensible

I'm working on a project where we need to customize the library a bit. This is quite hard to do because lots of methods are private or some fields are final.

Some examples:

Problem 1: Partition key

  • Requirement: Use a custom partition key to publish files to the manifest stream. This may be required when you need to run COPY commands in parallel to different tables.
//S3ManifestEmitter.java
// Use constant partition key to ensure file order
putRecordRequest.setPartitionKey(manifestStream);
  • Problem: To change that piece of code, we had to copy-paste the whole emit method.

Problem 2: Mock S3Client

  • Requirement: I want to unit test a module that extends S3Emitter. For that, I want to mock AmazonS3Client so it does not call the real service.
//S3Emitter.java
 public S3Emitter(KinesisConnectorConfiguration configuration) {
        s3Bucket = configuration.S3_BUCKET;
        s3Endpoint = configuration.S3_ENDPOINT;
        s3client = new AmazonS3Client(configuration.AWS_CREDENTIALS_PROVIDER);
        if (s3Endpoint != null) {
            s3client.setEndpoint(s3Endpoint);
        }
    }
  • Problem: There is no way to mock the S3Client because it is created inside the constructor. There is no constructor that receives the client. I ended up copy-pasting the whole class.

Incorrect handling of exceptions during checkpointing.

I have taken the S3 emitter sample and modified it to our needs. The core library code (under src/main/java) is unmodified. I left it running overnight for several nights for stability testing and encountered the following in the logs one morning (and have been unable to reproduce since):

2014-03-25 03:45:04,105 63555519 INFO  [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
2014-03-25 03:45:04,198 63555612 INFO  [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 successfully took 1 leases: shardId-000000000000
2014-03-25 03:45:06,339 63557753 INFO  [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
2014-03-25 03:45:06,431 63557845 INFO  [pool-2-thread-2] com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker abcd:-1234:efgh:-8000 successfully took 1 leases: shardId-000000000000
[snip]
2014-03-25 03:49:13,416 63804830 INFO  [pool-1-thread-3] com.mycompany.mypackage.S3Emitter - Successfully emitted 1002 records to S3 in s3://my-bucket/my-stream/2014-03-25/03/startid-stopid
2014-03-25 03:49:13,450 63804864 INFO  [pool-1-thread-3] com.amazonaws.services.kinesis.leases.impl.LeaseRenewer - Worker abcd:-1234:efgh:-8000 refusing to update lease with key shardId-000000000000 because concurrency tokens don't match
2014-03-25 03:49:13,451 63804865 ERROR [pool-1-thread-3] com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor - com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard

Last set of messages kept repeating. It continued to process items, but Kinesis wasn't acknowledging the processing. When I restarted the program, it re-processed all of those items. I have no idea what caused the problem has I have left it running for days on end with no problems, and the same levels of load going through Kinesis.

It appears that it was actually continuing to process new items instead of reprocessing the same set over and over again.

I have narrowed the problem down to KinesisConnectorRecordProcessor.emit(IRecordProcessorCheckpointer,List). https://github.com/awslabs/amazon-kinesis-connectors/blob/master/src/main/java/com/amazonaws/services/kinesis/connectors/KinesisConnectorRecordProcessor.java#L134

emitter.emit on line 139 did not encounter any problems and returned an empty list since nothing failed.
The call to checkpointer.checkpoint(); on line 153 threw a ShutdownException, which was then properly caught.
The call to emitter.fail(unprocessed) in the catch block on 157 passed in the empty list returned above.

It would seem to me that if checkpointer.checkpoint() throws, emitter.fail() should be called with all emitItems. Additionally, at least if a ShutdownException is encountered, it would be nice to attempt to re-initialize the Kinesis consumer.

In the mean time, we should be able to make do with terminating our program any time emitter.fail() is called, and have a supervisor simply restart the program when it terminates.

Upgrade to a newer Elasticsearch version ?

The requirements state the Elasticsearch connector depends on Elasticsearch 1.2.1

But that version is about a year old, and they're now all the way up to 1.6.x

Not sure if it's an easy change or fairly involved to upgrade.

Application logs

We would like to shop the application log (for this KCL app) to cloud watch. Does it generate any specific log file? If not, can I at least add it when running the app....

PostgreSQL driver for redshiftmanifest sample

From the directions:

https://github.com/awslabs/amazon-kinesis-connectors/blob/master/src/main/samples/redshiftmanifest/build.xml#L4

Downloaded the PG jdbc driver http://jdbc.postgresql.org/download.html and put in the following directory:

src/jdbc/lib/postgresql-9.3-1101.jdbc41.jar

Added the driver path to build.xml

<path id="samples.classpath">
  /* removed */
  <fileset dir="${basedir}/../jdbc/lib" includes="**/*.jar" />
</path>

In RedshiftManifestSample.properties i have the URL:

redshiftURL = jdbc:postgresql://HOST_NAME_REMOVED.us-east-1.redshift.amazonaws.com

When I run the program with ant run is see the following:

[java] Mar 28, 2014 1:21:48 PM samples.utils.RedshiftUtils tableExists
[java] SEVERE: java.sql.SQLException: No suitable driver found for jdbc:postgresql://HOST_NAME_REMOVED.us-east-1.redshift.amazonaws.com
[java] Exception in thread "main" java.lang.IllegalStateException: Could not create Redshift file table kinesisFiles
[java]  at samples.KinesisConnectorExecutor.createRedshiftFileTable(Unknown Source)
[java]  at samples.KinesisConnectorExecutor.setupAWSResources(Unknown Source)
[java]  at samples.KinesisConnectorExecutor.<init>(Unknown Source)
[java]  at samples.redshiftmanifest.RedshiftManifestExecutor.<init>(Unknown Source)
[java]  at samples.redshiftmanifest.RedshiftManifestExecutor.main(Unknown Source)
[java] Caused by: java.sql.SQLException: No suitable driver found for jdbc:postgresql://ht-redshift-test.chzuqbjvtyx7.us-east-1.redshift.amazonaws.com
[java]  at java.sql.DriverManager.getConnection(DriverManager.java:596)
[java]  at java.sql.DriverManager.getConnection(DriverManager.java:187)
[java]  at samples.utils.RedshiftUtils.createRedshiftTable(Unknown Source)
[java]  ... 5 more

Emitter for Amazon Elasticsearch Service

For writing to Amazon Elasticsearch Service, we should put signature v4 to our request. Do you plan to support that? If not, I'll try to implement it. Basic idea is implementing AmazonElasticsearchServiceEmitter for requesting Amazon Elasticsearch Service. ElasticsearchObject and ElasticsearchTransformer may be works well with Amazon Elasticsearch Service too.

`Unable to load AWS credentials from any provider in the chain` error message when try and run any of the samples

I've been trying unsuccessfully to run the different samples e.g. the S3 sample.

To do that, I have:

  1. Created an AwsCredentials.properties file in each of the sample folders (e.g. amazon-kinesis-connector/src/main/samples/s3/AwsCredentials.properties), with accessKey and secretKey fields set. (Note that this is an exact copy of the AwsCredentials.properties file I used to run the aws-java-sdk-1.6.12 samples successfully.
  2. Copied the aws-java-sdk-1.6.12 and KinesisClientLibrary into the locations specified in the build.xml file. (As an aside - this meant copying the libraries into amazon-kinesis-connector/src/aws-java-sdk-1.6.12/ and amazon-kinesis-connector/src/KinesisClientLibrary/lib/amazon-kinesis-client-1.0.0.jar which was a bit weird...)
  3. Running ant run from the command line in e.g. amazon-kinesis-connector/main/samples/s3

I keep getting the following error:

[java] Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
     [java]     at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
     [java]     at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2120)
     [java]     at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:296)
     [java]     at samples.utils.KinesisUtils.streamExists(Unknown Source)
     [java]     at samples.utils.KinesisUtils.createAndWaitForStreamToBecomeAvailable(Unknown Source)
     [java]     at samples.utils.KinesisUtils.createInputStream(Unknown Source)
     [java]     at samples.KinesisConnectorExecutor.setupAWSResources(Unknown Source)
     [java]     at samples.KinesisConnectorExecutor.<init>(Unknown Source)
     [java]     at samples.s3.S3Executor.<init>(Unknown Source)
     [java]     at samples.s3.S3Executor.main(Unknown Source)
     [java] Java Result: 1

It seems like the AwsCredentials.properties file is not being loaded... Is that right? What am I doing wrong? Any help would be much appreciated

Monitoring and reschudling

Hi,
I am trying to implement kinesis-s3 connector. my question is how I can monitor job (production perspective) and in case of any exception how I can take action (send mail, reschedule job).
Is cloudwatch good option ? What are other recommended ways

Thanks
Ankur

Add a fail callback when the message can not be transformed.

The emitter interface contains a #fail(List records) method. That's quite useful because if for some reason one message cannot be emitted after N retries, we just publish that to an sqs dead letter queue.

We would like to do the same but also when the message cannot be transformed to T. At the moment, if the transformer throws an exception, the KinesisConnectorRecordProcessor class just logs the exception and that's it.

It would be cool if in addition to log the exception, it can call a callback in the transformer or in the interface that you guys think is better, so then we can do something with the messages that can not be transformed (for example, invalid json, or a json without a field that the pipeline assumes present).

Compiling

I've spent a ton of time trying to get anything to build with this project. There are no clear instructions to build the sample application. You try, and then you will find that you need the specific jar file of the Kinesis Client, placed into a specific location, from here: #1

Then, you find that it still fails because it wants an "external" directory in the source tree. Create that empty dir, and try again.

Next, the problem is 29 errors, e.g.:

ElasticsearchEmitter.java:241: error: cannot find symbol
    [javac]         } else if (response.getStatus().equals(ClusterHealthStatus.GREEN)) {
    [javac]                                                ^
    [javac]   symbol:   variable ClusterHealthStatus
    [javac]   location: class ElasticsearchEmitter   

Another possible clue:

warning: Supported source version 'RELEASE_6' from annotation processor     'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotation    Processor' less than -source '1.7'

I am using java 1.7, but given this error, even tried version 1.6 which did not resolve. I also tried both the open sdk and the oracle sdk.

All I want to do is compile a sample file and move on, pulling out my hair and stabbing my eyeballs out in frustration here.....

I'm getting an AWSCredentials.properties file not found error

I ran the following command:

/opt/storm/bin/storm jar /shared/KinesisStormSpout27.jar SampleTopology sample.properties RemoteMode

Getting an error message on

Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider

I checked my KinesisStormSpout27.jar file package which I built with Eclipse, converted to Maven Java Project, loaded dedendencies, and I know and verified that this "missing" file is there in packaged jar , inside aws-java-sdk-1.7.13.jar

Any help is appreciated.

Sam

See complete error below:

[vagrant@supervisor1 ~]$ /opt/storm/bin/storm jar /shared/KinesisStormSpout27.jar SampleTopology sample.properties RemoteMode
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/kryo-2.21.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/commons-io-2.4.jar:/opt/storm/lib/httpclient-4.3.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-

AWS ES service version conflict

The AWS ES service is using es 1.5.2, but your library are depend on 1.2.1.
There is an exception when I try to connect to AWS ES service.

org.elasticsearch.client.transport.NoNodeAvailableException: No node available

Any thought on this? I am not sure if I am right.

ElasticSearch connector does not work against AWS-ES service

It appears that the connector does not work with ES as managed by aws. Although the initial connect appears to be succesful (using endpoint at port 443), the first call after makes it disconnect again:

2016-05-12 12:40:49,485 @main INFO ElasticsearchEmitter ElasticsearchEmitter using elasticsearch endpoint search-<...>.eu-west-1.es.amazonaws.com:443 ElasticsearchEmitter.java(118)
2016-05-12 12:40:49,640 @main INFO plugins [Leap-Frog] loaded [], sites [] PluginsService.java(151)
2016-05-12 12:41:44,595 @LeaseCoordinator-1 INFO LeaseTaker Worker cfe43d56bde02857:-651163f0:154a48db5ff:-8000 saw 36 total leases, 35 available leases, 1 workers. Target is 36 leases, I have 1 leases, I will take 35 leases LeaseTaker.java(403)
2016-05-12 12:42:51,789 @main INFO transport [Leap-Frog] failed to get node info for [#transport#-1][pbox-VirtualBox][inet[search-<...>.eu-west-1.es.amazonaws.com/52.17.232.195:443]], disconnecting... TransportClientNodesService.java(398)
org.elasticsearch.transport.SendRequestTransportException: [][inet[search-<...>.eu-west-1.es.amazonaws.com/52.17.232.195:443]][cluster:monitor/nodes/info]
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:286)
at org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:243)
at org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:376)

A similar issue is reported @ amazon-archives/cloudwatch-logs-subscription-consumer#9

Perhaps the elasticsearch-hadoop (spark) receiver gives some hints how to approach this: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/reference.html; specifically the option:

conf.set("es.nodes.wan.only", "true"); (https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html)

unicode handling

Either the attached code or the ant configuration does not properly handle unicode characters when writing to output files. Did anyone experience this issue and find appropriate lines of code that need to be modified to handle unicode characters?

amazon redshift connector deployment

Hi All,

I've managed to get the amazon redshift connector running locally on my virtual machine, however, I would like to use this library as part of our production deployment.

Does anyone have an example of what the deployment topology would look like? For example, what is the throughput to Redshift when running this library on single EC2 instance? Does it need to be deployed using a distributed mechanism? If it can be distributed, will it integrate with Apache Spark (Streaming)?

Thanks in advanced for any help, Mike.

Handling duplicate records

We are using the latest build to read messages from a multi-shard Kinesis stream using workers running across multiple EC2 instances and write them to S3. In this context, please see the following doc:

http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-duplicates.html

It talks about cases where we can end up with reading duplicate records from the stream. The S3Emitter uses the buffer.firstSeqNumber and buffer.lastSeqNumber to determine the S3 filename. In the case of retries, this approach might not guarantee duplicate removal as the buffer can be filled differently with a different size or count. The approach suggested by the doc does not work in the case of S3Emitter as there is no way to control the exact number of records in the buffer (due to the emit being based on count, time OR size). Any thoughts on how we can overcome this limitation?

Thanks.

ITransformer: Using java.lang.Exception instead of IOException

Hi

I feel that defining toClass & fromClass to throw IOException is very constraining.
In case, I am performing some validation and want to throw some custom Exception I cannot do that.
Also the KinesisConnectorRecordProcessor only handles IOExceptions, its even handle any other Runtime Exceptions that may occur. This is not a valid reason to stop processing the rest of the records.
Hence I request you to change the use java.lang.Exception or at the very least use RuntimeExceptions instead everywhere

Unable to load AWS credentials from any provider in the chain

I am trying to run the RedshiftBasic sample and I have included aws.accessKeyId and aws.secretKey properties at the end of the RedshiftBasicSample.properties file per the instructions. When I run 'ant run', the following trace comes out:

run:
    [javac] /Users/andrewguy/www/amazon-kinesis-connectors/src/main/samples/redshiftbasic/build.xml:48: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 28 source files to /Users/andrewguy/www/amazon-kinesis-connectors/src/main/build
    [javac] warning: Supported source version 'RELEASE_6' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.8'
    [javac] 1 warning
     [java] Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
     [java]     at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
     [java]     at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2486)
     [java]     at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:861)
     [java]     at samples.utils.KinesisUtils.streamExists(Unknown Source)
     [java]     at samples.utils.KinesisUtils.createAndWaitForStreamToBecomeAvailable(Unknown Source)
     [java]     at samples.utils.KinesisUtils.createInputStream(Unknown Source)
     [java]     at samples.KinesisConnectorExecutor.setupAWSResources(Unknown Source)
     [java]     at samples.KinesisConnectorExecutor.<init>(Unknown Source)
     [java]     at samples.redshiftbasic.RedshiftBasicExecutor.<init>(Unknown Source)
     [java]     at samples.redshiftbasic.RedshiftBasicExecutor.main(Unknown Source)
     [java] Java Result: 1

BUILD SUCCESSFUL
Total time: 3 seconds

What am I missing?

Thanks.

Kinesis back-pressure applied from the KCL

If my emitter is down for some reason, I see the size of my buffer growing larger and larger until my VM runs out of memory. The reason is the record processor continues to read records from Kinesis, regardless of the size of my buffer. This separate thread is reading the records from Kinesis and draining to my buffer continuously, whether I can emit the records at that moment or not.

Is there some way to provide back-pressure information to the thread that is polling Kinesis for additional records in these cases? Wouldn't any process that emits records much slower than they are read from Kinesis end up in an OoM situation?

We use batch semantics in our emitter to write to Elasticsearch which can handle about a 10MB payload, so we restrict our in-memory buffer to 10MB. However, if Elasticsearch is down for some reason for a period of a minute or so, our buffer grows to 100MB in size. Doesn't anyone who uses the KCL have to consider segmenting the records passed to their emitter because they could be passed, many, many more records than they expect given their buffer configuration parameters?

S3Emitter to allow folders

We are currently using the S3Emitter to KCL to transfer data to a bucket. The current file format is <firstSeq>-<lastSeq>.

The bucket grows to contain thousands+ files really quickly and we found it really useful to prefix the file format with yyyy/MM/dd/HH/.

Is this something the AWS team would be find useful and be interested in adding into this project?

HTTP proxy support

I am wondering if the library supports Http Proxy. We are trying to use the ES connector using AWS Cloudwatch Consumer and for some reason the API is unable to store the Lease information in DynamoDB. I get the HTTP connection timed out error.

I tried passing the proxy settings through JAVA_OPTIONS in call to the "java -jar" as System properties but that doesn't seem to make any difference.

Any known fixes for this?

Here's the Stack trace to the effect.

2016-02-20 02:57:10,741 INFO AmazonHttpClient - Unable to execute HTTP request: connect timed out
java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:656)
at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:524)
at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:403)
at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:118)
at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177)
at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:304)
at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:611)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:446)
at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:706)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:467)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3240)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:1047)
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:373)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:317)
at com.amazonaws.services.kinesis.connectors.KinesisConnectorExecutorBase.run(KinesisConnectorExecutorBase.java:95)
at com.amazonaws.services.logs.connectors.samples.elasticsearch.ElasticsearchConnector.main(ElasticsearchConnector.java:38)

Thanks

Time-based buffer sample

In the samples provided, we have the following property defined in s3.properties file:

bufferMillisecondsLimit = 3600000

Flush when buffer exceeds 25 Amazon Kinesis records, 1 MB size limit or when time since last buffer exceeds 1 hour

This property seems to indicate that the buffer's contents will be emitted when the configured time elapses (among other limits). However, I do not see this property being referred anywhere in the source. Even the "BasicMemoryBuffer.java" uses only the other two limits (count & size) and not this time-based property. What would be the right place to implement the time-based emitting of records from the buffer? I checked KinesisConnectorRecordProcessor.java, but feel that check-pointing would be difficult without some global state. Any pointers would greatly help.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.