Coder Social home page Coder Social logo

lambda-streams-to-firehose's Introduction

AWS Streams to Kinesis Firehose Forwarder

Please note that this project is now deprecated for Kinesis Streams sources. For forwarding Kinesis Streams data to Kinesis Firehose, you can now configured Kinesis Streams as a source directly. You may still find this project useful for forwarding DynamoDB Update Streams to Kinesis Firehose

Amazon Kinesis Firehose simplifies delivery of streaming data to Amazon S3 and Amazon Redshift with a simple, automatically scaled, and zero operations requirement. Where customers have existing systems built on streaming interfaces, the addition of Firehose can enable simple archive, or be used to facilitate long term analysis of data from Amazon Redshift. Integration can be accomplished by using the Kinesis Agent to automatically publish file data to Amazon Kinesis Streams and/or Amazon Kinesis Firehose delivery streams. This project includes an AWS Lambda function that enables customers who are already using Amazon Kinesis Streams for real time processing to take advantage of Amazon Kinesis Firehose. Furthermore, if you are using Amazon DynamoDB and would like to store a history of changes made to the table, this function can push events to Amazon Kinesis Firehose.

StreamToFirehose

Pre-requisites

In order to effectively use this function, you should already have configured an Amazon Kinesis Stream or an Amazon DynamoDB Table with Update Streams, as well as an Amazon Kinesis Firehose Delivery Stream of the correct name. For Amazon Kinesis Streams, please ensure that producer applications can write to the Stream, and that the Amazon Kinesis Firehose Delivery Stream is able to deliver data to Amazon S3 or Amazon Redshift. This function makes no changes to Stream or Firehose configurations. You must also have the AWS Command Line Interface (https://aws.amazon.com/cli) installed to take advantage of the Stream Tagging utility supplied.

Usage

  1. Clone the repository
  2. Run build.sh
  3. Upload a zip file from the dist directory to your lambda

Configuration

This Lambda functions can map stream sources to Kinesis Firehose Delivery Streams in a few different ways (listed in order of preference):

  • Manually specified configuration (see deliveryStreamMapping in index.js:78)
  • A DynamoDB stream naming convention to determine which Delivery Stream to forward to
  • An Kinesis Stream Tagging convention
  • (Optionally) A default delivery stream.

Using the Default Delivery Stream

In order to make sure that data will always be accepted by a Kinesis Firehose Delivery Stream this Lambda function can fail back to a default Delivery Stream if no manual configuration or other lookup has results.

This can be particularly helpful when developing and testing the integration of new data sources. In such cases you could have use the Default Delivery Stream to forward data to an S3 bucket with a one day retention period as specified in an S3 Lifecycle Policy.

The Default Delivery Stream is enabled by default in the Lambda function, however to use it there should be a Kinesis Firehose with a matching name. You can use the createDefaultDeliveryStream.sh script to orchestrate its creation.

Note: We recommend the usage of default delivery streams only for non-production workloads. They can be disabled by setting USE_DEFAULT_DELIVERY_STREAMS = false (see index.js:70)

Specifying a Delivery Stream for a Kinesis Stream Source

If Amazon Kinesis Streams are the source, the Delivery Stream can be specified in configuration or tags can be used to specify the Delivery Stream target. To Tag the Stream for Amazon Kinesis Firehose Delivery simply run the tagKinesisStream.sh script:

tagStream.sh <My Kinesis Stream> <My Firehose Delivery Stream> <region>
where
	<My Kinesis Stream> - The Amazon Kinesis Stream for which an event source has been created to the Forwarder Lambda function
	<My Firehose Delivery Stream> - The Amazon Kinesis Firehose Delivery Stream which you've configured to deliver to the required destination
	<region> - The region in which the Kinesis Stream & Firehose Delivery Stream have been created. Today only single region operation is permitted

This will add a new Stream Tag named ForwardToFirehoseStream on the Kinesis Stream with the value you supply. This is limited to delivery in the same region as the Kinesis Stream or DynamoDB table. You can run the script any time to update this value. To view the Tags configured on the Stream, simply run aws kinesis list-tags-for-stream --stream-name <My Kinesis Stream> --region <region>

Specifying a Delivery Stream for a DynamoDB Stream Source

If you are using Amazon DynamoDB, then manual configuration can be used or the Firehose Delivery Stream should have the same name as the Amazon DynamoDB Table.

Deploying

To use this function, simply deploy the LambdaStreamToFirehose-1.4.0.zip to AWS Lambda with handler index.handler. You must ensure that it is deployed with an invocation role that includes the ability to write Amazon CloudWatch Logs, Read from Amazon Kinesis or Amazon DynamoDB Streams, and Write to Amazon Kinesis Firehose:

{
  "Statement": [
    {
      "Action": [
        "logs:*"
      ],
      "Effect": "Allow",
      "Resource": [
        "*"
      ],
      "Sid": "Stmt1446202596000"
    },
    {
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:ListStreams",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords",
        "kinesis:ListTagsForStream"
      ],
      "Effect": "Allow",
      "Resource": [
        "*"
      ],
      "Sid": "Stmt1446202612000"
    },
    {
      "Action": [
        "firehose:DescribeDeliveryStream",
        "firehose:ListDeliveryStreams",
        "firehose:PutRecord",
        "firehose:PutRecordBatch"
      ],
      "Effect": "Allow",
      "Resource": [
        "*"
      ],
      "Sid": "Stmt1446202630000"
    },
    {
      "Action": [
        "dynamodb:DescribeStream",
        "dynamodb:DescribeTable",
        "dynamodb:GetItem",
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:ListStreams",
        "dynamodb:ListTables"
      ],
      "Effect": "Allow",
      "Resource": [
        "*"
      ],
      "Sid": "Stmt1447079825000"
    }
  ],
  "Version": "myversion"
}

You may choose to restrict the IAM role to be specific to a subset of Kinesis or DynamoDB Update Streams and Firehose endpoints.

Finally, create an Event Source (http://docs.aws.amazon.com/lambda/latest/dg/lambda-introduction.html) for this function from the Stream to be forwarded to Firehose.

Optional Data Transformation

This Lambda function can support streaming transformation your data. Kinesis data is base64-decoded before being transformed, and when DynamoDB Update Streams are being forwarded, data will have the following structure:

{
Keys,
NewImage,
OldImage,
SequenceNumber,
SizeBytes,
ApproximateCreationDateTime,
eventName
}

For more information on DynamoDB Update Streams, please read http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html.

By default, the function assumes that data is JSON, and will 'stringify' the data and append a newline character, so that files delivered to S3 are nicely formatted, and easy to load into Amazon Redshift. If your data is not JSON type, then you can add an environment variable to the deployed Lambda function 'StreamDatatype', which can be one of the following values:

  • CSV
  • CSV-WITH-NEWLINES
  • BINARY
  • JSON

If this setting is found, then the module will transform the data in the most appropriate way. CSV data will not be modified, but will have a new line character added before pushing to Kinesis Firehose. CSV-WITH-NEWLINES and BINARY are not modified in any way prior to forwarding.

The function also provides a framework to write your own transformers. If you would like to modify the data after it's read from the Stream, but before it's forwarded to Firehose, then you can implement and register a new Javascript function with the following interface (ideally in transformer.js):

function(inputData, callback(err,outputData));

inputData: Object containing stream data. For Kinesis, the input is a base64 decoded string, while for DynamoDB it is a JSON object
callback: function to be invoked once transformation is completed, with arguments:
	err: Any errors that were found during transformation
	outputData: Buffer instance (typically 'ascii' encoded) which will be forwarded to Firehose

You then register this transformer function by assigning an instance of it to the exported transformer instance in the header of index.js:

// var useTransformer = transform.addNewlineTransformer.bind(undefined);
var useTransformer = myTransformerFunction.bind(undefined, <internal setup args>);

You can also take advantage of a built in regex-to-csv transformer, which can be used by un-commenting and configuring the following entry in the function:

// var transformer = exports.regexToDelimiter.bind(undefined, /(myregex) (.*)/, "|");

Where /(myregex) (.*)/ is the regular expression that uses character classes to capture data from the input stream to export to the CSV, and "|" is the delimiter.

Delivery Stream Routing

As stated previously, data will be forwarded on the basis of a Kinesis tag named ForwardToFirehoseStream, and if this isn't found, then it will fall back to a default delivery stream. DynamoDB update streams are always routed to the delivery stream with the same name as the base table.

In version 1.4.0, we added the ability to do dynamic routing. For example, you might want to route to different destinations on S3 or Redshift on the basis of the actual data being received. You can now use this by overriding the default routing, and providing a map of who records should be routed. You do this by changing the router.defaultRouting method to be router.routeByAttributeMapping. When done, you need to previde an 'attribute delivery map' which tells the router which fields to look at in your data, and how to route based on their values. You do this with a configuration object - for example to route by the value of an attribute binaryValue that can only be true or false:

var attributeMap = {
    "binaryValue" : {
	"true" : "TestRouting-route-A",
	"false" : "TestRouting-route-B"
    }
};

this attribute map is then used to configure the router instance:

var useRouter = router.routeByAttributeMapping.bind(undefined, attributeMap);

Please note this feature is only supported for JSON data in event payloads.

Filtering by Event Types

For use-cases where the lambda needs to push only records of certain event type (INSERT, MODIFY, REMOVE); add an environment variable to the lambda:

  • Env variable name: WRITABLE_EVENT_TYPES
  • Env variable value: Comma-separated list of writable event types

Use-cases:

  • AppendOnly: Set the env variable value to just INSERT
  • UpsertOnly: Set the env variable value to just INSERT,MODIFY

When this environment variable is not set, all types of events will be written.

Confirming Successful Execution

When successfully configured, writes to your Stream will be automatically forwarded to the Firehose Delivery Stream, and you'll see data arriving in Amazon S3 and optionally Amazon Redshift. You can also view CloudWatch Logs for this Lambda function as it forwards stream data to Firehose

Debugging & Creating New Builds

If you write a new transformer, you may wish to see debug logging in the CloudWatch Logs Stream. If so, then simply change the env variable DEBUG to true.

You will then need to rebuild and redeploy the function. To do this, first install the required dependencies with npm install, and then you can deploy a new version of the function with the build.sh script included in the repository. This will automatically redeploy the function using name 'LambdaStreamToFirehose'. If you have deployed your function as a different name, then please update the name in build.sh

Technical Bits

This function uses the putRecordBatch interface to Firehose to send 500 messages at a time with a max payload size of 4MB (as of 2015-11-02). The batches are processed serially so as to preserve the order of messages as they are received from the Stream.

Transformation creates another copy of input records, so you must plan accordingly when sizing the memory limit in AWS Lambda. Also consider that user defined transformers which significantly increase message size will need to be stored in memory before being dispatched to Firehose. If you need to limit the number of records for any reason, this can be set on the Event Source for your function.

This function was written in and for node.js v0.10 and v0.12. Version 1.4.2 has been tested against Node.js 4.3, but the syntax is not ES6. We know this and appreciate that there are updated syntax options that what is available in the code. We are happy to accept pull requests that bring the codebase into the century of the fruitbat.


AWS Streams to Firehose

Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

lambda-streams-to-firehose's People

Contributors

athityakumar avatar benoittgt avatar cnees avatar daichiueura avatar dandalf avatar dependabot[bot] avatar ericksonjoseph avatar herriojr avatar hyandell avatar ianmeyers avatar mbaran90 avatar wagneraw avatar zetxek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lambda-streams-to-firehose's Issues

Not sure this project works

I've been trying to use this project without success.

I have a DynamoDB table, that matches my table name in Redshift.

I've setup the DynamoDB table to stream to the lambda function, which I've uploaded this project's zip file, with the default stream stuff disabled.

I've setup a Kinesis Firehose connection to Redshift.

The issue is that the redshift load fails because the data it is trying to load is not in any format redshift expects. You reference line numbers all over the place in the documentation, but they don't match up because they were not updated with the code. I'm just not sure this code has ever been tested with dynamodb streams or works.

The attached file is the file it's attempting to load via the COPY command in redshift.
example.txt

Question on how to configure function for DDB Stream delivery to S3

To configure the module to run for DynamoDB Update Streams, simply:

  • Create a Firehose Delivery Stream with the same name as the DynamoDB Table in the same Region
  • Configure Update Streams on your DynamoDB table, ideally with OLD_AND_NEW_VERSIONS included
  • Deploy the LambdaStreamsToFirehose function to AWS Lambda
  • Create the Event Source for the Lambda function from the DynamoDB Table Update Stream

Understand how to easily forward streams across firehose

Hello

I dig into a code, read few times the README. Trying to understand what will be the best strategy to write data into different redshift tables without copy everything kinesis, lambda, firehose...

I have one setup for redshift for no-production and one for production environment.

Is there a way to easily forward data to the right firehose ?

Thanks

How to change delivery stream for existing input stream (using tags)?

Updating the ForwardToFirehoseStream tag for a Kinesis stream doesn't affect the delivery of the stream - it still delivers to the old firehose. From looking at the .js it seems the deliveryStreamMapping is redefined only at the start of the file before the handler. I guess this is really more of a Lambda question, but anyway: How can I get the Lambda function to pick up the updated delivery stream tag?

confusing data being written to S3 from the firehose

Hi!

I've got this function up and running - processing a dynamodb stream in to a kinesis firehose which then writes to S3. The only data I'm seeing in S3 is endless lines of this:

[object Object]
[object Object]
[object Object]
[object Object]
[object Object]
[object Object]
[object Object]

Any idea what I need to do in order to get meaningful data relating to the changes to DynamoDB records written out to S3?

Thanks!

Duplications when trying to switch to a new version

Hello

Recently I turned off the event-source of the original lambda (lambda-streams-to-firehose), start a new lambda separately (fork-lambda-streams-to-firehose) then turn on event-source on the fork-lambda-streams-to-firehose. The new lambda replayed all the data from the stream.

It finally insert the last 24h data twice in Redshift. I thought that the lambda was changing the checkpoint of the data stream to tell which records has been read. Am I correct?

Thanks in advance.

Code throws exception in line 450

When running the code in lambda, it gives exception in line 450 saying
"Undefined is not a function ", in this line
"if (event.Records[0].eventSource === KINESIS_SERVICE_NAME || event.Records[0].eventSource === DDB_SERVICE_NAME)"

This operation is not permitted on KinesisStreamAsSource delivery stream type.

My putRecordBatchParams:
{ "DeliveryStreamName": "Delivery-TTL-Teste", "Records": [ { "Data": { "type": "Buffer", "data": [ 123, 34, 75, 101, 121, 115, 34, 58, 123, 34, 115, 101, 115, 115, 105, 111, 110, 105, 100, 34, 58, 123, 34, 83, 34, 58, 34, 98, 56, 99, 52, 57, 97, 99, 56, 45, 97, 49, 56, 49, 45, 52, 100, 53, 98, 45, 57, 49, 98, 102, 45, 101, 101, 100, 97, 52, 100, 100, 99, 98, 98, 51, 50, 34, 125, 125, 44, 34, 79, 108, 100, 73, 109, 97, 103, 101, 34, 58, 123, 34, 67, 114, 101, 97, 116, 105, 111, 110, 84, 105, 109, 101, 34, 58, 123, 34, 78, 34, 58, 34, 49, 53, 52, 55, 54, 54, 51, 54, 49, 55, 34, 125, 44, 34, 83, 101, 115, 115, 105, 111, 110, 68, 97, 116, 97, 34, 58, 123, 34, 77, 34, 58, 123, 34, 82, 101, 116, 97, 105, 108, 101, 114, 34, 58, 123, 34, 83, 34, 58, 34, 115, 104, 111, 112, 114, 105, 116, 101, 34, 125, 44, 34, 84, 105, 109, 101, 79, 102, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 34, 58, 123, 34, 78, 34, 58, 34, 49, 53, 52, 55, 54, 54, 51, 54, 49, 55, 34, 125, 44, 34, 65, 109, 111, 117, 110, 116, 34, 58, 123, 34, 78, 34, 58, 34, 55, 56, 57, 56, 34, 125, 44, 34, 78, 97, 109, 101, 34, 58, 123, 34, 83, 34, 58, 34, 119, 105, 108, 108, 105, 97, 109, 34, 125, 125, 125, 44, 34, 115, 101, 115, 115, 105, 111, 110, 105, 100, 34, 58, 123, 34, 83, 34, 58, 34, 98, 56, 99, 52, 57, 97, 99, 56, 45, 97, 49, 56, 49, 45, 52, 100, 53, 98, 45, 57, 49, 98, 102, 45, 101, 101, 100, 97, 52, 100, 100, 99, 98, 98, 51, 50, 34, 125, 44, 34, 69, 120, 112, 105, 114, 97, 116, 105, 111, 110, 84, 105, 109, 101, 34, 58, 123, 34, 78, 34, 58, 34, 49, 53, 52, 55, 54, 54, 55, 50, 49, 55, 34, 125, 125, 44, 34, 83, 101, 113, 117, 101, 110, 99, 101, 78, 117, 109, 98, 101, 114, 34, 58, 34, 49, 57, 48, 50, 56, 53, 49, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 51, 55, 57, 57, 49, 50, 52, 55, 49, 34, 44, 34, 83, 105, 122, 101, 66, 121, 116, 101, 115, 34, 58, 50, 48, 53, 44, 34, 65, 112, 112, 114, 111, 120, 105, 109, 97, 116, 101, 67, 114, 101, 97, 116, 105, 111, 110, 68, 97, 116, 101, 84, 105, 109, 101, 34, 58, 49, 53, 52, 55, 54, 54, 55, 55, 55, 53, 44, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 82, 69, 77, 79, 86, 69, 34, 44, 34, 117, 115, 101, 114, 73, 100, 101, 110, 116, 105, 116, 121, 34, 58, 123, 34, 112, 114, 105, 110, 99, 105, 112, 97, 108, 73, 100, 34, 58, 34, 100, 121, 110, 97, 109, 111, 100, 98, 46, 97, 109, 97, 122, 111, 110, 97, 119, 115, 46, 99, 111, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 83, 101, 114, 118, 105, 99, 101, 34, 125, 125, 10 ] } } ] }

Erro:
2019-01-18T02:07:20.961Z fae1d2a0-12e2-4bfe-9864-1bb7a5da5cf3 { InvalidArgumentException: This operation is not permitted on KinesisStreamAsSource delivery stream type.
at Request.extractError (/var/runtime/node_modules/aws-sdk/lib/protocol/json.js:48:27)
at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:105:20)
at Request.emit (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:77:10)
at Request.emit (/var/runtime/node_modules/aws-sdk/lib/request.js:683:14)
at Request.transition (/var/runtime/node_modules/aws-sdk/lib/request.js:22:10)
at AcceptorStateMachine.runTo (/var/runtime/node_modules/aws-sdk/lib/state_machine.js:14:12)
at /var/runtime/node_modules/aws-sdk/lib/state_machine.js:26:10
at Request. (/var/runtime/node_modules/aws-sdk/lib/request.js:38:9)
at Request. (/var/runtime/node_modules/aws-sdk/lib/request.js:685:12)
at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:115:18)
message: 'This operation is not permitted on KinesisStreamAsSource delivery stream type.',
code: 'InvalidArgumentException',
time: 2019-01-18T02:07:20.961Z,
requestId: 'e1d8a349-23f4-bc6d-b563-94519964e77c',
statusCode: 400,
retryable: false,
retryDelay: 71.66140146807749 }

Which is my problem? Any Idea?

Is there any python code for this functionality

I'm trying to write this in python but when I'm testing my python code, the dynamic partition format structure I have configured for my delivery stream is not working as it is working for this repo in node.js. I have tried various options but nothing is working.

Duplicated records

Hello @IanMeyers and others

I'm getting random duplicates rows. For last day :

"id","count"
2535282,2
2543816,2
2543817,2
2549680,2
2549679,2
2535281,2
2555470,2
2565819,2

I can see the duplicate entries on s3 file but not when reading kinesis stream content.

Trying to investigate.

Access denied to s3 buckets housing LambdaStreamToFirehose-1.3.5.zip

An error occurred (AccessDeniedException) when calling the CreateFunction operation: Your access has been denied by S3, please make sure your request credentials have permission to GetObject for aws-lambda-streams-to-firehose-us-west-2/LambdaStreamToFirehose-1.3.5.zip. S3 Error Code: AccessDenied. S3 Error Message: Access Denied: ClientError
Traceback (most recent call last):

Compression support?

Is there any plan to add compression support when putting data into Firehose?

Since Firehose charges are incurred based on the volume of data ingress, it would behoove a rational economic actor to minimize the amount of data being sent to Firehose (and thus compress data beforehand rather than using the compression option in Firehose to minimize the amount of data being written to S3).

Problem with a Cloudwatch Logs Destination to Kinesis stream?

I am using a CWL Destination sending to a Stream and then using your Lambda to send it on to Firehose - S3, no Firehose compression or encryption.
The files that show up look like some strange unicode format. Is there an issue with using a Destination as a CWL subscription to Kinesis stream here?

Example file contents in S3:

"\u001f�\b\u0000\u0000\u0000\u0000\u0000\u0000\u00005��\n�@\u0014E�e�uD���\u000b1\u0017YB\n-\"bҗ>�\u0019�7\u0016\u0011�{c��p/��7��H\u0014��\u001a�\u001e��}z���.H�u\u0018�\u0011WO\t�O*��Oa�2R\u0005٠RE�U��l��h\u0010���^)��\u0018Tr��\u0001M�;����\u0001�����\u000fu�VÈڎM\u001dw>sW�r�p��_�\u00178F���~z\u001e�K��(\u000bV��L�ԍ�v\t����\u0016%\u0010\u0012��ژw��\u0003_��\u0003�\u0000\u0000\u0000"

Support for filtering by event types

Hey there! 👋

We're using this repo code as a lambda to write DynamoDB input to Firehose streams output. Now we have come across a "AppendOnly" use-case, wherein we need to write only the INSERT events, and ignore the MODIFY/DELETE events.

We've implemented a change on top of this repo, adding writableEventTypes as a comma-separated environment variable to the lambda. Just wanted to check if anyone has a similar use-case, and if we could create a PR for the same to this repo.

Ofcourse, if there's a more compatible way of adding this change, please let me know and I'll then open the PR. Jut wanted to check this is a compatible change with the maintainers of this repo, before going ahead to open a PR! 😄

And awesome work with this repo! TIA! 🎉

Type in tagging script

Currently the "tagStream.sh" script has the line:

aws kinesis add-tags-to-stream --stream-name $1 --Tags ForwardToFirehoseStream=$2 --region $3

But according to the Kinesis docs, --Tags should actually be lower case: http://docs.aws.amazon.com/cli/latest/reference/kinesis/add-tags-to-stream.html.

It will give you this error unless you change to lower case:

usage: aws [options] <command> <subcommand> [<subcommand> ...] [parameters]
To see help text, you can run:

  aws help
  aws <command> help
  aws <command> <subcommand> help
aws: error: argument --tags is required

Simple bug fix, and easy enough to work around, just wanted to put it on your radar.

Btw, thanks for this repo, this fits perfectly for what my company is trying to do with streams.

Problems with international characters

Hi,

When dumping a message with international characters to an S3 Firehose, it seems the international characters get messed up in the resulting S3 file. When tailing the Kinesis Stream using aws cli and base64 decoding the result to UTF-8, the result is correct, But when opening the resulting S3 file, the characters get replaced with something I haven't been able to decode.
Examples:
"Østover" becomes "C[CAN]stover" (CAN being the ASCII Cancel symbol)
"ELÅ2" becomes "ELC[ENQ]2" (ENQ being the ASCII Enquiry symbol)

Any thoughts on what could be wrong?

Filtering by event type

I'm in the process of setting up DynamoDB archiving and was using the guide here:
https://aws.amazon.com/blogs/database/automatically-archive-items-to-s3-using-dynamodb-time-to-live-with-aws-lambda-and-amazon-kinesis-firehose/

That referred me to this repository for pushing my TTL'd data into Firehose from Lambda. That works great!
However, I'm getting every change made to dynamo rather than just the TTL'd docs. There is some verbage in the link I provided above that indicates I can check for the event name, etc..

In this example, you archive all item changes to S3. If you wanted to archive only the items deleted by TTL, you could archive only the records where eventName is REMOVE and userIdentity contains PrincipalID equal to dynamodb.amazonaws.com.

Can you guys give me a snippet or some guidance on how to do this? I'm under the assumption I need to make a tweak to index file to handle this case.

Thanks!

Update:
I ended up just updating ln 467 to check for removal and principalId as follows:

if (record.eventName == "REMOVE" && record.userIdentity && 
    record.userIdentity.principalId == "dynamodb.amazonaws.com") {
          
         // dynamo update stream record
         var data = exports.createDynamoDataItem(record);

Would you guys have a better suggestion?

Add tests

Hello

I'm using this lambdas in production and with my team we love to use code who is tested. I wonder if testing the behavior of his lambda is something other people want?

If yes what will be your requirements? Local testing (aka mocking external call with proxyquire for example)?

Thanks

Update documentation on Lambda tests and handler

I found it difficult to start the code because I was missing a few helpful pieces that can be addressed in the README file. I would be happy to submit a pull request, but it seems easy enough to describe.

  1. The correct handler to use in the lambda function is "index.handler". This is the default, but it's not obvious what it should be in the README deployment section.
  2. (kinesis) When configuring the "test" section in the lambda tab, it is useful to mention that the sample event data template should be Kinesis, and that the "eventSourceARN" key should be set to the correct (or test) kinesis ARN.

If you do not, you will see errors like: TypeError: Cannot call method 'split' of undefined at Object.exports.getStreamName (/var/task/index.js:163:33) at exports.handler (/var/task/index.js:393:28)

Lastly, if the tag is set incorrectly on the Kinesis stream, you will see an error similar to: Delivery Stream undefined does not exist in region us-east-1

Firehose delivery missing?

I'm using dynamodb-continuous-backup for the first time, and have an issue I don't understand. I was monitoring the S3 bucket to test something else when I realized one of the inserts never arrived. DynamoDB shows 6 entries for a given partition key, but only 5 made it into S3. CloudWatch show an appropriate entry "2017-05-10T19:19:28.861Z 14502b25-707b-4ecc-9697-4321b747b9c1 Event forwarding complete. Forwarded 1 batches comprising 1 records to Firehose production_tracking", but the data never arrived.

So for 6 inserts in a 20 hours period, the 3rd one is missing, and I didn't change anything except the data during this time. The CloudWatch log is the only evidence I can find.

Any ideas?

Exceeding 4MB limit

We're running 1.4.5.

We're commonly getting errors for exceeding the 4MB write limit to Firehose. This causes the lambda to retry indefinitely on the same data, causing processing on the Kinesis shard to backup until the data ages out.

We solved this by editing constants.js:

// firehose max PutRecordBatch size 4MB
FIREHOSE_MAX_BATCH_BYTES = 4 * 1024 * 1024;

Is now:

// firehose max PutRecordBatch size 2MB
FIREHOSE_MAX_BATCH_BYTES = 2 * 1024 * 1024;

And for good measure we also did the following:

FIREHOSE_MAX_BATCH_COUNT = 500;

Is now:

FIREHOSE_MAX_BATCH_COUNT = 250;

The above changes immediately solved our issue. Looking through the code, I'm not sure how a batch of > 4MB is able to get through, but it appears that was the case for us.

`finish` arguments constructed erroneously with `undefined` value

In the following code snippet:
https://github.com/awslabs/lambda-streams-to-firehose/blob/master/index.js#L472-L479

A map key is being deleted:
https://github.com/awslabs/lambda-streams-to-firehose/blob/master/index.js#L475

Then the key fetch will result in undefined?
https://github.com/awslabs/lambda-streams-to-firehose/blob/master/index.js#L477

Should the underlying value should be assigned to a variable for comparison prior to delete being called on the key?

Function not found: arn:aws:lambda:us-west-2::function:LambdaStreamToFirehose

Using latest build in us-west-2.

DynamoDB: bobStackLocal-DynamoDBLocalLMV1-1NGPVVV0M72X1-relationship-1E00QQ9B3N27Y
arn:aws:dynamodb:us-west-2:$ACCT:table/pdhStackLocal-DynamoDBLocalLMV1-1NGPVVV0M72X1-relationship-1E00QQ9B3N27Y/stream/2016-11-07T23:50:50.147
Batch size: 1000Last result: PROBLEM: Function not found: arn:aws:lambda:us-west-2:$ACCT:function:LambdaStreamToFirehose (Service: AWSLambda; Status Code: 404; Error Code: ResourceNotFoundException; Request ID: 60166c14-2ab8-4475-98f0-926e5fc0537a)

myregex example

Hello, I want to add delimiter between the records I am getting from stream. Can you specify any example of doing it using regex? I am using dynamoDB stream. Without delimiter I am getting an error on redshift that, Delimiter is not found. Please help @IanMeyers @benoittgt its urgent. Thanks in advance.

ReferenceError: finish is not defined

When the lambda authorisation doesn't include 'ListTagsForStream' permission, we can't get the delivery stream name from Kinesis tag. In this case of error, we finish (line 326 in index.js).

At this point, I get the following exception. The problem is reproducible each time, using the 1.4.1 or 1.4.0

2016-12-08T17:52:53.127Z 9ada9f25-50a4-4039-9e0e-223cb687004f ReferenceError: finish is not defined at Response.<anonymous> (/var/task/index.js:326:3) at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:355:18) at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:105:20) at Request.emit (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:77:10) at Request.emit (/var/runtime/node_modules/aws-sdk/lib/request.js:668:14) at Request.transition (/var/runtime/node_modules/aws-sdk/lib/request.js:22:10) at AcceptorStateMachine.runTo (/var/runtime/node_modules/aws-sdk/lib/state_machine.js:14:12) at /var/runtime/node_modules/aws-sdk/lib/state_machine.js:26:10 at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:38:9) at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:670:12)

init does not call callback on subsequent runs

When init is initially called, it sets up the streams and does a callback to the handler.
When it is called a second time, it sees online === true, so it skips the callback resulting in the lambda to just fail silently.
Pull request incoming.

Question about lambda - error handling

Does the lambda has any logic that enables it to read again same messages from the kinesis in next invocation in case of failure? Are there checkpoints?

Properly find the error

Hello

The lambda works well. But today we had one error. I wanted to look at it
capture d ecran 2016-12-12 a 17 19 21

I went to cloudwatch. Select all the log of the day and search into that group error how should normally grep errorMessage but nothing. Do I miss something?
It seems we always end the function with done but change the status to ERROR is something fail?

Thanks in advance

Cannot find module 'index'

I uploaded the .zip file to a Lambda function based off of the node.js DynamoDB Streams blueprint. This configures a handler index.handler. I was unable to download the zip file using Safari, but successful using Chrome. However, Lambda complained that it was unable to unarchive the zip file. So, I just downloaded the folder and zipped it locally. The upload then succeeded, but I am getting the following error message after testing:

{
"errorMessage": "Cannot find module 'index'",
"errorType": "Error",
"stackTrace": [
"Function.Module._resolveFilename (module.js:338:15)",
"Function.Module._load (module.js:280:25)",
"Module.require (module.js:364:17)",
"require (module.js:380:17)"
]
}

require aws-kpl-deagg

After running ./build.sh, and uploading the resulting dist .zip file to lambda, I was getting the following error in my logs when the lambda function was invoked by a kinesis event:

Unable to import module 'index': Error at Function.Module._resolveFilename (module.js:338:15)

Seems like I had to install the deaggregator package first, even after running "npm install" for the first time (which, at present, seems to only include async package).

npm install aws-kpl-deagg

Anyway, simple fix. Maybe it helps somebody else :)

Error on dynamodb stream to firehose: awslabs/dynamo_continuous_backup.

2016-11-03T16:05:24.322Z 81820a90-cb82-4566-90db-35866393c78b
{
"message": "Firehose LambdaStreamsDefaultDeliveryStream not found under account.",
"code": "ResourceNotFoundException",
"time": "2016-11-03T16:05:24.322Z",
"requestId": "54279dc9-a1df-11e6-a64e-d184623b1b33",
"statusCode": 400,
"retryable": false,
"retryDelay": 65.43006924912333
}

11/03 09:15 > grep -r LambdaStreamsDefaultDeliveryStream *
createDefaultDeliveryStream.sh:deliveryStreamName="LambdaStreamsDefaultDeliveryStream"
index.js: 'DEFAULT' : 'LambdaStreamsDefaultDeliveryStream'

QUESTION: What is the value of using Firehose with DynamoDb streams vs just writing directly to S3?

We are looking into dynamodb backup solutions and have been looking into this lib.
Out of the box it looks great, however we have a requirement to deploy to Montreal which doesn't currently support Lambda or Firehose.

So this is probably a dumb question and shows I'm not really understanding the value of Firehose, but if you already have a scaled stream processing application (processing your Dynamo Streams) why would you write to Firehose rather than directly to a destination like S3.

I've been looking into the DynamoDB Streams Kinesis Adapter, and considering building a stream handler that takes the Dynamo change stream and writes it to Firehose (potentially in a different region!!!) which then writes it to S3 in the Montreal region. (Obviously incurs extra transfer charges!)

But why would I do that if I'm already having to write the Kinesis Adapter application? Why not just write directly to S3 from the adapter application? But if that were true for a Kinesis Adapter application why wouldn't we just do that from a Lambda function too.

I can see how using Firehose gives you a level of generality enabling you to redirect easily to different destinations. But are there other scalability reasons why it still makes more sense to go through Firehose?

Thanks for any insights.

Small amount of event not streamed without errors

Hello

In my pipeline : The data came with kinesis stream, I don't use Dynamodb mapping, only the default, firehose write data into an s3 file.

Yesterday two 'event' were not recorder into s3. I dig into Kinesis Stream and found my two event :

#<struct Aws::Kinesis::Types::Record
  sequence_number="49*3138",
  approximate_arrival_timestamp=2017-02-13 17:18:39 +0100,
  data="3444946|2017-02-13 16:18:39.323|***",
  partition_key="partitionkey">,
#<struct Aws::Kinesis::Types::Record
  sequence_number="49*28130",
  approximate_arrival_timestamp=2017-02-13 17:18:39 +0100,
  data="3444947|2017-02-13 16:18:39.364|***",
  partition_key="partitionkey">,

In s3 I have an s3 file with lot's of events. I have one before, and after but not this two :

3444945|2017-02-13 16:18:30.100|***
3444948|2017-02-13 16:18:41.832|**

I dig into cloudwatch to find errors in the lambda but without seeing any issues.

Where to look at to understand why records where not inserted into s3? 😞

Question around boolean `online` variable

Hi! First of all thanks for this lambda, it's very useful.

I'm trying to use it to deliver events from a Kinesis stream to Firehose, then S3, without any data transformation. Our architecture is moving towards event sourcing: we have small and frequent events that need to be processed quickly with streams, but we also need to store events to S3 to recover a previous state of the application. So this lambda does exactly what we need.

By the way, I don't understand the meaning of the online boolean variable: here seems preventing to write all of the events from the stream, but just the first one (within the buffer threshold).

Data come with surrounded quotes

Hello

First thanks for this lambda. It's very efficient.
I have a problem with data written in s3. I'm using the aws-sdk-ruby like this :

payload = "29384721034|#{Time.now.strftime("%Y-%m-%d %H:%M:%S")}|987689|9898|desktop|982323|app:my.app.com|23789423|2.6"
20.times { kinesis.put_record(stream_name: 'appaloosa-int', data: payload, partition_key: 'partitionkey') }

Data are written like this in s3

"29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323|app:my.app.com|23789423|2.6"
"29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323|app:my.app.com|23789423|2.6"
...

Before I was using firehose with the same data and had

29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323|app:my.app.com|23789423|2.6
29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323|app:my.app.com|23789423|2.6
...

I've tried to tweak the COPY command and add REMOVEQUOTES but then I have a delimiter error :

raw_line :"29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323|app:my.app.com|23789423|2.6"                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
raw_field_value : 29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323|app:my.app.com|23789423|2.6 
err_code : 1214
err_reason : Delimiter not found

I've tried to specify it without success.

And I'm stuck 😞

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.