Comments (5)
Hi,
I had help from AWS support and modified their code to suit mine. It is in python
from __future__ import print_function
from json import JSONDecoder
import base64
import json
import re
print('Loading function')
def lambda_handler(event, context):
output = []
succeeded_record_cnt = 0
failed_record_cnt = 0
for record in event['records']:
print(record['recordId'])
payload = base64.b64decode(record['data'])
print(str(payload))
output_obj=payload
print(str(output_obj))
opstr=''
opstr=opstr+str(output_obj) +'\n'
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(opstr)
}
output.append(output_record)
print('Processing completed. Successful records {}, Failed records {}.'.format(succeeded_record_cnt, failed_record_cnt))
return {'records': output}
In my scenario I needed a line break and I added '\n' to the end of each object.
From my understanding, the transformed data is base64 encoded, hence we would needed to decode it add the delimiter and then convert back to be returned back to the Kinesis Analytics.
I do not have any error handling in it but feel free to edit and post it here if you can write one up.
from lambda-streams-to-firehose.
Hello @akashbiz
You should look at :
- https://github.com/awslabs/lambda-streams-to-firehose/blob/08014b8f6514cf84d91e2ad7ce8f36b1b5a86af4/README.md#optional-data-transformation
lambda-streams-to-firehose/transformer.js
Lines 45 to 61 in 4b28aa1
Feel free to add a test to verify your regex : https://github.com/awslabs/lambda-streams-to-firehose/blob/master/test/testInputs.test.js
from lambda-streams-to-firehose.
Hello @IanMeyers @benoittgt , Thanks for quick response. I think I am missing some configuration in firehose delivery stream.
When I get stream from dynamoDB after execution of Lambda I am getting following result
{ invocationId: '0b214ec1-6b67-4c78-8881-9b3998555205', deliveryStreamArn: 'arn:aws:firehose:us-east-1:xxxxxxxx:deliverystream/<streamName>', region: 'us-east-1', records: [ { recordId: '49575469680524135041586805764649280618633657491608567810', approximateArrivalTimestamp: 1501146562192, data: 'eyJLZXlzIjp7IkRldmljZUlEIjp7IlMiOiJEQVZJUy1NLTIwLVcifSwiVGltZXN0YW1wIjp7IlMiOiIxNTAxMTQ2NTYwODI5In19LCJOZXdJbWFnZSI6eyJUZW1wZXJhdHVyZSI6eyJTIjoiNjMuMTM5OTk5OTk5OTk5OTkifSwiRGV2aWNlSUQiOnsiUyI6IkRBVklTLU0tMjAtVyJ9LCJQcmVzc3VyZSI6eyJTIjoiMTMyLjg0In0sIlRpbWVzdGFtcCI6eyJTIjoiMTUwMTE0NjU2MDgyOSJ9fSwiU2VxdWVuY2VOdW1iZXIiOiI0MDIxMTAyMDAwMDAwMDAwMDI0MDI0MDA1NzgiLCJTaXplQnl0ZXMiOjEyNiwiQXBwcm94aW1hdGVDcmVhdGlvbkRhdGVUaW1lIjoxNTAxMTQ2NTQwLCJldmVudE5hbWUiOiJJTlNFUlQifQo=' }, { recordId: '49575469680524135041586805770929650251531656329085059074', approximateArrivalTimestamp: 1501146564204, data: 'eyJLZXlzIjp7IkRldmljZUlEIjp7IlMiOiJCSVo0SU5URUxMSUEtTElCMDIifSwiVGltZXN0YW1wIjp7IlMiOiIxNTAxMTQ2NTYzMTg4In19LCJOZXdJbWFnZSI6eyJDb2xpZm9ybUJhY3RlcmlhIjp7IlMiOiIzNiJ9LCJDeWFub0JhY3RlcmlhIjp7IlMiOiIyMDg0MSJ9LCJUZW1wZXJhdHVyZSI6eyJTIjoiODAifSwiRGV2aWNlSUQiOnsiUyI6IkJJWjRJTlRFTExJQS1MSUIwMiJ9LCJBbGthbGluaXR5Ijp7IlMiOiIyMzUifSwiVGltZXN0YW1wIjp7IlMiOiIxNTAxMTQ2NTYzMTg4In0sIkRlcHRoIjp7IlMiOiIyMCJ9LCJFQyI6eyJTIjoiMCJ9fSwiU2VxdWVuY2VOdW1iZXIiOiI0MDIxMTAzMDAwMDAwMDAwMDI0MDI0MDE1ODciLCJTaXplQnl0ZXMiOjE2OCwiQXBwcm94aW1hdGVDcmVhdGlvbkRhdGVUaW1lIjoxNTAxMTQ2NTQwLCJldmVudE5hbWUiOiJJTlNFUlQifQo=' } ] }
then I have transformation function configured at delivery stream level which gives me output as
[ { recordId: '49575469680524135041586805764649280618633657491608567810', result: 'Ok', data: 'eyJLZXlzIjp7IkRldmljZUlEIjp7IlMiOiJEQVZJUy1NLTIwLVcifSwiVGltZXN0YW1wIjoiMDcuMjcuMjAxNyAwOTowOToyMCJ9LCJOZXdJbWFnZSI6eyJUZW1wZXJhdHVyZSI6eyJTIjoiNjMuMTM5OTk5OTk5OTk5OTkifSwiRGV2aWNlSUQiOnsiUyI6IkRBVklTLU0tMjAtVyJ9LCJQcmVzc3VyZSI6eyJTIjoiMTMyLjg0In0sIlRpbWVzdGFtcCI6IjA3LjI3LjIwMTcgMDk6MDk6MjAifSwiU2VxdWVuY2VOdW1iZXIiOiI0MDIxMTAyMDAwMDAwMDAwMDI0MDI0MDA1NzgiLCJTaXplQnl0ZXMiOjEyNiwiQXBwcm94aW1hdGVDcmVhdGlvbkRhdGVUaW1lIjoxNTAxMTQ2NTQwLCJldmVudE5hbWUiOiJJTlNFUlQifQ==' }, { recordId: '49575469680524135041586805770929650251531656329085059074', result: 'Ok', data: 'eyJLZXlzIjp7IkRldmljZUlEIjp7IlMiOiJCSVo0SU5URUxMSUEtTElCMDIifSwiVGltZXN0YW1wIjoiMDcuMjcuMjAxNyAwOTowOToyMyJ9LCJOZXdJbWFnZSI6eyJDb2xpZm9ybUJhY3RlcmlhIjp7IlMiOiIzNiJ9LCJDeWFub0JhY3RlcmlhIjp7IlMiOiIyMDg0MSJ9LCJUZW1wZXJhdHVyZSI6eyJTIjoiODAifSwiRGV2aWNlSUQiOnsiUyI6IkJJWjRJTlRFTExJQS1MSUIwMiJ9LCJBbGthbGluaXR5Ijp7IlMiOiIyMzUifSwiVGltZXN0YW1wIjoiMDcuMjcuMjAxNyAwOTowOToyMyIsIkRlcHRoIjp7IlMiOiIyMCJ9LCJFQyI6eyJTIjoiMCJ9fSwiU2VxdWVuY2VOdW1iZXIiOiI0MDIxMTAzMDAwMDAwMDAwMDI0MDI0MDE1ODciLCJTaXplQnl0ZXMiOjE2OCwiQXBwcm94aW1hdGVDcmVhdGlvbkRhdGVUaW1lIjoxNTAxMTQ2NTQwLCJldmVudE5hbWUiOiJJTlNFUlQifQ==' } ]
then I checked the record written in S3, which is in object format but those objects are not comma separated. Can you help me to know what am I doing wrong?
from lambda-streams-to-firehose.
Can you provide an example of what ends up in S3? The above examples are just the representation of the Kinesis records and are still Base64 encoded.
from lambda-streams-to-firehose.
Sure,
Here is the object get saved in S3,
{ "Keys": { "DeviceID": "sample-4564", "Timestamp": "08.02.2017 09:56:07" }, "NewImage": { "DeviceID": "sample-4564", "Latitude": "11.6021536", "Longitude": "-12.7415883", "Timestamp": "08.02.2017 09:56:07" }, "SequenceNumber": "432312800000000007978172699", "SizeBytes": 132, "ApproximateCreationDateTime": 1501667760, "eventName": "INSERT" }
from lambda-streams-to-firehose.
Related Issues (20)
- Problem with a Cloudwatch Logs Destination to Kinesis stream? HOT 4
- exports.onCompletion is not a function HOT 2
- Small amount of event not streamed without errors HOT 5
- QUESTION: What is the value of using Firehose with DynamoDb streams vs just writing directly to S3? HOT 2
- Question about lambda - error handling HOT 4
- confusing data being written to S3 from the firehose HOT 3
- Firehose delivery missing? HOT 11
- README and Downloads have incorrect version reference HOT 2
- Exceeding 4MB limit HOT 7
- Question around boolean `online` variable HOT 5
- init does not call callback on subsequent runs HOT 2
- Compression support? HOT 2
- Question on how to configure function for DDB Stream delivery to S3 HOT 5
- Filtering by event type
- This operation is not permitted on KinesisStreamAsSource delivery stream type. HOT 3
- Add to awesome-functions HOT 1
- AWS Lambda not writing cross AWS account HOT 2
- Support for filtering by event types HOT 5
- Is there any python code for this functionality HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from lambda-streams-to-firehose.