Data come with surrounded quotes


First thanks for this lambda. It's very efficient.
I have a problem with data written in s3. I'm using the aws-sdk-ruby like this :

payload = "29384721034|#{"%Y-%m-%d %H:%M:%S")}|987689|9898|desktop|982323||23789423|2.6"
20.times { kinesis.put_record(stream_name: 'appaloosa-int', data: payload, partition_key: 'partitionkey') }

Data are written like this in s3

"29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323||23789423|2.6"
"29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323||23789423|2.6"

Before I was using firehose with the same data and had

29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323||23789423|2.6
29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323||23789423|2.6

I've tried to tweak the COPY command and add REMOVEQUOTES but then I have a delimiter error :

raw_line :"29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323||23789423|2.6"                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
raw_field_value : 29384721034|2016-08-11 15:27:09|987689|9898|desktop|982323||23789423|2.6 
err_code : 1214
err_reason : Delimiter not found

I've tried to specify it without success.

And I'm stuck 😞

Exceeding 4MB limit

We're running 1.4.5.

We're commonly getting errors for exceeding the 4MB write limit to Firehose. This causes the lambda to retry indefinitely on the same data, causing processing on the Kinesis shard to backup until the data ages out.

We solved this by editing constants.js:

// firehose max PutRecordBatch size 4MB
FIREHOSE_MAX_BATCH_BYTES = 4 * 1024 * 1024;

Is now:

// firehose max PutRecordBatch size 2MB
FIREHOSE_MAX_BATCH_BYTES = 2 * 1024 * 1024;

And for good measure we also did the following:


Is now:


The above changes immediately solved our issue. Looking through the code, I'm not sure how a batch of > 4MB is able to get through, but it appears that was the case for us.

Question around boolean `online` variable

Hi! First of all thanks for this lambda, it's very useful.

I'm trying to use it to deliver events from a Kinesis stream to Firehose, then S3, without any data transformation. Our architecture is moving towards event sourcing: we have small and frequent events that need to be processed quickly with streams, but we also need to store events to S3 to recover a previous state of the application. So this lambda does exactly what we need.

By the way, I don't understand the meaning of the online boolean variable: here seems preventing to write all of the events from the stream, but just the first one (within the buffer threshold).

Question on how to configure function for DDB Stream delivery to S3

To configure the module to run for DynamoDB Update Streams, simply:

  • Create a Firehose Delivery Stream with the same name as the DynamoDB Table in the same Region
  • Configure Update Streams on your DynamoDB table, ideally with OLD_AND_NEW_VERSIONS included
  • Deploy the LambdaStreamsToFirehose function to AWS Lambda
  • Create the Event Source for the Lambda function from the DynamoDB Table Update Stream

myregex example

Hello, I want to add delimiter between the records I am getting from stream. Can you specify any example of doing it using regex? I am using dynamoDB stream. Without delimiter I am getting an error on redshift that, Delimiter is not found. Please help @IanMeyers @benoittgt its urgent. Thanks in advance.

init does not call callback on subsequent runs

When init is initially called, it sets up the streams and does a callback to the handler.
When it is called a second time, it sees online === true, so it skips the callback resulting in the lambda to just fail silently.
Pull request incoming.

Access denied to s3 buckets housing

An error occurred (AccessDeniedException) when calling the CreateFunction operation: Your access has been denied by S3, please make sure your request credentials have permission to GetObject for aws-lambda-streams-to-firehose-us-west-2/ S3 Error Code: AccessDenied. S3 Error Message: Access Denied: ClientError
Traceback (most recent call last):

Problems with international characters


When dumping a message with international characters to an S3 Firehose, it seems the international characters get messed up in the resulting S3 file. When tailing the Kinesis Stream using aws cli and base64 decoding the result to UTF-8, the result is correct, But when opening the resulting S3 file, the characters get replaced with something I haven't been able to decode.
"Østover" becomes "C[CAN]stover" (CAN being the ASCII Cancel symbol)
"ELÅ2" becomes "ELC[ENQ]2" (ENQ being the ASCII Enquiry symbol)

Any thoughts on what could be wrong?

Type in tagging script

Currently the "" script has the line:

aws kinesis add-tags-to-stream --stream-name $1 --Tags ForwardToFirehoseStream=$2 --region $3

But according to the Kinesis docs, --Tags should actually be lower case:

It will give you this error unless you change to lower case:

usage: aws [options] <command> <subcommand> [<subcommand> ...] [parameters]
To see help text, you can run:

  aws help
  aws <command> help
  aws <command> <subcommand> help
aws: error: argument --tags is required

Simple bug fix, and easy enough to work around, just wanted to put it on your radar.

Btw, thanks for this repo, this fits perfectly for what my company is trying to do with streams.

Compression support?

Is there any plan to add compression support when putting data into Firehose?

Since Firehose charges are incurred based on the volume of data ingress, it would behoove a rational economic actor to minimize the amount of data being sent to Firehose (and thus compress data beforehand rather than using the compression option in Firehose to minimize the amount of data being written to S3).

ReferenceError: finish is not defined

When the lambda authorisation doesn't include 'ListTagsForStream' permission, we can't get the delivery stream name from Kinesis tag. In this case of error, we finish (line 326 in index.js).

At this point, I get the following exception. The problem is reproducible each time, using the 1.4.1 or 1.4.0

2016-12-08T17:52:53.127Z 9ada9f25-50a4-4039-9e0e-223cb687004f ReferenceError: finish is not defined at Response.<anonymous> (/var/task/index.js:326:3) at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:355:18) at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:105:20) at Request.emit (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:77:10) at Request.emit (/var/runtime/node_modules/aws-sdk/lib/request.js:668:14) at Request.transition (/var/runtime/node_modules/aws-sdk/lib/request.js:22:10) at AcceptorStateMachine.runTo (/var/runtime/node_modules/aws-sdk/lib/state_machine.js:14:12) at /var/runtime/node_modules/aws-sdk/lib/state_machine.js:26:10 at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:38:9) at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:670:12)

Support for filtering by event types

Hey there! 👋

We're using this repo code as a lambda to write DynamoDB input to Firehose streams output. Now we have come across a "AppendOnly" use-case, wherein we need to write only the INSERT events, and ignore the MODIFY/DELETE events.

We've implemented a change on top of this repo, adding writableEventTypes as a comma-separated environment variable to the lambda. Just wanted to check if anyone has a similar use-case, and if we could create a PR for the same to this repo.

Ofcourse, if there's a more compatible way of adding this change, please let me know and I'll then open the PR. Jut wanted to check this is a compatible change with the maintainers of this repo, before going ahead to open a PR! 😄

And awesome work with this repo! TIA! 🎉

require aws-kpl-deagg

After running ./, and uploading the resulting dist .zip file to lambda, I was getting the following error in my logs when the lambda function was invoked by a kinesis event:

Unable to import module 'index': Error at Function.Module._resolveFilename (module.js:338:15)

Seems like I had to install the deaggregator package first, even after running "npm install" for the first time (which, at present, seems to only include async package).

npm install aws-kpl-deagg

Anyway, simple fix. Maybe it helps somebody else :)

How to change delivery stream for existing input stream (using tags)?

Updating the ForwardToFirehoseStream tag for a Kinesis stream doesn't affect the delivery of the stream - it still delivers to the old firehose. From looking at the .js it seems the deliveryStreamMapping is redefined only at the start of the file before the handler. I guess this is really more of a Lambda question, but anyway: How can I get the Lambda function to pick up the updated delivery stream tag?

Problem with a Cloudwatch Logs Destination to Kinesis stream?

I am using a CWL Destination sending to a Stream and then using your Lambda to send it on to Firehose - S3, no Firehose compression or encryption.
The files that show up look like some strange unicode format. Is there an issue with using a Destination as a CWL subscription to Kinesis stream here?

Example file contents in S3:


Add tests


I'm using this lambdas in production and with my team we love to use code who is tested. I wonder if testing the behavior of his lambda is something other people want?

If yes what will be your requirements? Local testing (aka mocking external call with proxyquire for example)?


Update documentation on Lambda tests and handler

I found it difficult to start the code because I was missing a few helpful pieces that can be addressed in the README file. I would be happy to submit a pull request, but it seems easy enough to describe.

  1. The correct handler to use in the lambda function is "index.handler". This is the default, but it's not obvious what it should be in the README deployment section.
  2. (kinesis) When configuring the "test" section in the lambda tab, it is useful to mention that the sample event data template should be Kinesis, and that the "eventSourceARN" key should be set to the correct (or test) kinesis ARN.

If you do not, you will see errors like: TypeError: Cannot call method 'split' of undefined at Object.exports.getStreamName (/var/task/index.js:163:33) at exports.handler (/var/task/index.js:393:28)

Lastly, if the tag is set incorrectly on the Kinesis stream, you will see an error similar to: Delivery Stream undefined does not exist in region us-east-1

confusing data being written to S3 from the firehose


I've got this function up and running - processing a dynamodb stream in to a kinesis firehose which then writes to S3. The only data I'm seeing in S3 is endless lines of this:

[object Object]
[object Object]
[object Object]
[object Object]
[object Object]
[object Object]
[object Object]

Any idea what I need to do in order to get meaningful data relating to the changes to DynamoDB records written out to S3?


Not sure this project works

I've been trying to use this project without success.

I have a DynamoDB table, that matches my table name in Redshift.

I've setup the DynamoDB table to stream to the lambda function, which I've uploaded this project's zip file, with the default stream stuff disabled.

I've setup a Kinesis Firehose connection to Redshift.

The issue is that the redshift load fails because the data it is trying to load is not in any format redshift expects. You reference line numbers all over the place in the documentation, but they don't match up because they were not updated with the code. I'm just not sure this code has ever been tested with dynamodb streams or works.

The attached file is the file it's attempting to load via the COPY command in redshift.

Error on dynamodb stream to firehose: awslabs/dynamo_continuous_backup.

2016-11-03T16:05:24.322Z 81820a90-cb82-4566-90db-35866393c78b
"message": "Firehose LambdaStreamsDefaultDeliveryStream not found under account.",
"code": "ResourceNotFoundException",
"time": "2016-11-03T16:05:24.322Z",
"requestId": "54279dc9-a1df-11e6-a64e-d184623b1b33",
"statusCode": 400,
"retryable": false,
"retryDelay": 65.43006924912333

11/03 09:15 > grep -r LambdaStreamsDefaultDeliveryStream *"LambdaStreamsDefaultDeliveryStream"
index.js: 'DEFAULT' : 'LambdaStreamsDefaultDeliveryStream'

`finish` arguments constructed erroneously with `undefined` value

In the following code snippet:

A map key is being deleted:

Then the key fetch will result in undefined?

Should the underlying value should be assigned to a variable for comparison prior to delete being called on the key?

Cannot find module 'index'

I uploaded the .zip file to a Lambda function based off of the node.js DynamoDB Streams blueprint. This configures a handler index.handler. I was unable to download the zip file using Safari, but successful using Chrome. However, Lambda complained that it was unable to unarchive the zip file. So, I just downloaded the folder and zipped it locally. The upload then succeeded, but I am getting the following error message after testing:

"errorMessage": "Cannot find module 'index'",
"errorType": "Error",
"stackTrace": [
"Function.Module._resolveFilename (module.js:338:15)",
"Function.Module._load (module.js:280:25)",
"Module.require (module.js:364:17)",
"require (module.js:380:17)"

QUESTION: What is the value of using Firehose with DynamoDb streams vs just writing directly to S3?

We are looking into dynamodb backup solutions and have been looking into this lib.
Out of the box it looks great, however we have a requirement to deploy to Montreal which doesn't currently support Lambda or Firehose.

So this is probably a dumb question and shows I'm not really understanding the value of Firehose, but if you already have a scaled stream processing application (processing your Dynamo Streams) why would you write to Firehose rather than directly to a destination like S3.

I've been looking into the DynamoDB Streams Kinesis Adapter, and considering building a stream handler that takes the Dynamo change stream and writes it to Firehose (potentially in a different region!!!) which then writes it to S3 in the Montreal region. (Obviously incurs extra transfer charges!)

But why would I do that if I'm already having to write the Kinesis Adapter application? Why not just write directly to S3 from the adapter application? But if that were true for a Kinesis Adapter application why wouldn't we just do that from a Lambda function too.

I can see how using Firehose gives you a level of generality enabling you to redirect easily to different destinations. But are there other scalability reasons why it still makes more sense to go through Firehose?

Thanks for any insights.

Duplications when trying to switch to a new version


Recently I turned off the event-source of the original lambda (lambda-streams-to-firehose), start a new lambda separately (fork-lambda-streams-to-firehose) then turn on event-source on the fork-lambda-streams-to-firehose. The new lambda replayed all the data from the stream.

It finally insert the last 24h data twice in Redshift. I thought that the lambda was changing the checkpoint of the data stream to tell which records has been read. Am I correct?

Thanks in advance.

Question about lambda - error handling

Does the lambda has any logic that enables it to read again same messages from the kinesis in next invocation in case of failure? Are there checkpoints?

Is there any python code for this functionality

I'm trying to write this in python but when I'm testing my python code, the dynamic partition format structure I have configured for my delivery stream is not working as it is working for this repo in node.js. I have tried various options but nothing is working.

Filtering by event type

I'm in the process of setting up DynamoDB archiving and was using the guide here:

That referred me to this repository for pushing my TTL'd data into Firehose from Lambda. That works great!
However, I'm getting every change made to dynamo rather than just the TTL'd docs. There is some verbage in the link I provided above that indicates I can check for the event name, etc..

In this example, you archive all item changes to S3. If you wanted to archive only the items deleted by TTL, you could archive only the records where eventName is REMOVE and userIdentity contains PrincipalID equal to

Can you guys give me a snippet or some guidance on how to do this? I'm under the assumption I need to make a tweak to index file to handle this case.


I ended up just updating ln 467 to check for removal and principalId as follows:

if (record.eventName == "REMOVE" && record.userIdentity && 
    record.userIdentity.principalId == "") {
         // dynamo update stream record
         var data = exports.createDynamoDataItem(record);

Would you guys have a better suggestion?

Function not found: arn:aws:lambda:us-west-2::function:LambdaStreamToFirehose

Using latest build in us-west-2.

DynamoDB: bobStackLocal-DynamoDBLocalLMV1-1NGPVVV0M72X1-relationship-1E00QQ9B3N27Y
Batch size: 1000Last result: PROBLEM: Function not found: arn:aws:lambda:us-west-2:$ACCT:function:LambdaStreamToFirehose (Service: AWSLambda; Status Code: 404; Error Code: ResourceNotFoundException; Request ID: 60166c14-2ab8-4475-98f0-926e5fc0537a)

Properly find the error


The lambda works well. But today we had one error. I wanted to look at it
capture d ecran 2016-12-12 a 17 19 21

I went to cloudwatch. Select all the log of the day and search into that group error how should normally grep errorMessage but nothing. Do I miss something?
It seems we always end the function with done but change the status to ERROR is something fail?

Thanks in advance

This operation is not permitted on KinesisStreamAsSource delivery stream type.

My putRecordBatchParams:
{ "DeliveryStreamName": "Delivery-TTL-Teste", "Records": [ { "Data": { "type": "Buffer", "data": [ 123, 34, 75, 101, 121, 115, 34, 58, 123, 34, 115, 101, 115, 115, 105, 111, 110, 105, 100, 34, 58, 123, 34, 83, 34, 58, 34, 98, 56, 99, 52, 57, 97, 99, 56, 45, 97, 49, 56, 49, 45, 52, 100, 53, 98, 45, 57, 49, 98, 102, 45, 101, 101, 100, 97, 52, 100, 100, 99, 98, 98, 51, 50, 34, 125, 125, 44, 34, 79, 108, 100, 73, 109, 97, 103, 101, 34, 58, 123, 34, 67, 114, 101, 97, 116, 105, 111, 110, 84, 105, 109, 101, 34, 58, 123, 34, 78, 34, 58, 34, 49, 53, 52, 55, 54, 54, 51, 54, 49, 55, 34, 125, 44, 34, 83, 101, 115, 115, 105, 111, 110, 68, 97, 116, 97, 34, 58, 123, 34, 77, 34, 58, 123, 34, 82, 101, 116, 97, 105, 108, 101, 114, 34, 58, 123, 34, 83, 34, 58, 34, 115, 104, 111, 112, 114, 105, 116, 101, 34, 125, 44, 34, 84, 105, 109, 101, 79, 102, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 34, 58, 123, 34, 78, 34, 58, 34, 49, 53, 52, 55, 54, 54, 51, 54, 49, 55, 34, 125, 44, 34, 65, 109, 111, 117, 110, 116, 34, 58, 123, 34, 78, 34, 58, 34, 55, 56, 57, 56, 34, 125, 44, 34, 78, 97, 109, 101, 34, 58, 123, 34, 83, 34, 58, 34, 119, 105, 108, 108, 105, 97, 109, 34, 125, 125, 125, 44, 34, 115, 101, 115, 115, 105, 111, 110, 105, 100, 34, 58, 123, 34, 83, 34, 58, 34, 98, 56, 99, 52, 57, 97, 99, 56, 45, 97, 49, 56, 49, 45, 52, 100, 53, 98, 45, 57, 49, 98, 102, 45, 101, 101, 100, 97, 52, 100, 100, 99, 98, 98, 51, 50, 34, 125, 44, 34, 69, 120, 112, 105, 114, 97, 116, 105, 111, 110, 84, 105, 109, 101, 34, 58, 123, 34, 78, 34, 58, 34, 49, 53, 52, 55, 54, 54, 55, 50, 49, 55, 34, 125, 125, 44, 34, 83, 101, 113, 117, 101, 110, 99, 101, 78, 117, 109, 98, 101, 114, 34, 58, 34, 49, 57, 48, 50, 56, 53, 49, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 51, 55, 57, 57, 49, 50, 52, 55, 49, 34, 44, 34, 83, 105, 122, 101, 66, 121, 116, 101, 115, 34, 58, 50, 48, 53, 44, 34, 65, 112, 112, 114, 111, 120, 105, 109, 97, 116, 101, 67, 114, 101, 97, 116, 105, 111, 110, 68, 97, 116, 101, 84, 105, 109, 101, 34, 58, 49, 53, 52, 55, 54, 54, 55, 55, 55, 53, 44, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 82, 69, 77, 79, 86, 69, 34, 44, 34, 117, 115, 101, 114, 73, 100, 101, 110, 116, 105, 116, 121, 34, 58, 123, 34, 112, 114, 105, 110, 99, 105, 112, 97, 108, 73, 100, 34, 58, 34, 100, 121, 110, 97, 109, 111, 100, 98, 46, 97, 109, 97, 122, 111, 110, 97, 119, 115, 46, 99, 111, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 83, 101, 114, 118, 105, 99, 101, 34, 125, 125, 10 ] } } ] }

2019-01-18T02:07:20.961Z fae1d2a0-12e2-4bfe-9864-1bb7a5da5cf3 { InvalidArgumentException: This operation is not permitted on KinesisStreamAsSource delivery stream type.
at Request.extractError (/var/runtime/node_modules/aws-sdk/lib/protocol/json.js:48:27)
at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:105:20)
at Request.emit (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:77:10)
at Request.emit (/var/runtime/node_modules/aws-sdk/lib/request.js:683:14)
at Request.transition (/var/runtime/node_modules/aws-sdk/lib/request.js:22:10)
at AcceptorStateMachine.runTo (/var/runtime/node_modules/aws-sdk/lib/state_machine.js:14:12)
at /var/runtime/node_modules/aws-sdk/lib/state_machine.js:26:10
at Request. (/var/runtime/node_modules/aws-sdk/lib/request.js:38:9)
at Request. (/var/runtime/node_modules/aws-sdk/lib/request.js:685:12)
at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:115:18)
message: 'This operation is not permitted on KinesisStreamAsSource delivery stream type.',
code: 'InvalidArgumentException',
time: 2019-01-18T02:07:20.961Z,
requestId: 'e1d8a349-23f4-bc6d-b563-94519964e77c',
statusCode: 400,
retryable: false,
retryDelay: 71.66140146807749 }

Which is my problem? Any Idea?

Firehose delivery missing?

I'm using dynamodb-continuous-backup for the first time, and have an issue I don't understand. I was monitoring the S3 bucket to test something else when I realized one of the inserts never arrived. DynamoDB shows 6 entries for a given partition key, but only 5 made it into S3. CloudWatch show an appropriate entry "2017-05-10T19:19:28.861Z 14502b25-707b-4ecc-9697-4321b747b9c1 Event forwarding complete. Forwarded 1 batches comprising 1 records to Firehose production_tracking", but the data never arrived.

So for 6 inserts in a 20 hours period, the 3rd one is missing, and I didn't change anything except the data during this time. The CloudWatch log is the only evidence I can find.

Any ideas?

Duplicated records

Hello @IanMeyers and others

I'm getting random duplicates rows. For last day :


I can see the duplicate entries on s3 file but not when reading kinesis stream content.

Trying to investigate.

Small amount of event not streamed without errors


In my pipeline : The data came with kinesis stream, I don't use Dynamodb mapping, only the default, firehose write data into an s3 file.

Yesterday two 'event' were not recorder into s3. I dig into Kinesis Stream and found my two event :

#<struct Aws::Kinesis::Types::Record
  approximate_arrival_timestamp=2017-02-13 17:18:39 +0100,
  data="3444946|2017-02-13 16:18:39.323|***",
#<struct Aws::Kinesis::Types::Record
  approximate_arrival_timestamp=2017-02-13 17:18:39 +0100,
  data="3444947|2017-02-13 16:18:39.364|***",

In s3 I have an s3 file with lot's of events. I have one before, and after but not this two :

3444945|2017-02-13 16:18:30.100|***
3444948|2017-02-13 16:18:41.832|**

I dig into cloudwatch to find errors in the lambda but without seeing any issues.

Where to look at to understand why records where not inserted into s3? 😞

Understand how to easily forward streams across firehose


I dig into a code, read few times the README. Trying to understand what will be the best strategy to write data into different redshift tables without copy everything kinesis, lambda, firehose...

I have one setup for redshift for no-production and one for production environment.

Is there a way to easily forward data to the right firehose ?


Code throws exception in line 450

When running the code in lambda, it gives exception in line 450 saying
"Undefined is not a function ", in this line
"if (event.Records[0].eventSource === KINESIS_SERVICE_NAME || event.Records[0].eventSource === DDB_SERVICE_NAME)"

