Coder Social home page Coder Social logo

Comments (17)

 avatar commented on July 30, 2024 2

The other thing to consider is that you'll get relatively few duplicates while running normally, but the two places I see large numbers of duplicates are:

  1. Lambda/KCL failure - if your consumer reading from your Kinesis stream happens to process part of a batch, transmit it and die/crash before it checkpoints, you can end up getting a chunk of duplicates when it starts up again.

  2. Backfilling - if you ever have the need to replay old data (recovering from a crash or user error for instance), having a system that can drop duplicates makes this process MUCH easier. You can just replay data from the last N hours or days and not have to worry about introducing even more duplicates.

That being said, you're right, I don't think Firehose is a great fit for this at the moment. It's a lot of effort for potentially not much gain like you said. If you really want to use Firehose to write all the way to Redshift, then you're either going to have to do something complicated like you mentioned above or you'll have to live with the dupes. I'll try to find the actual SQL I use tomorrow to compare against what you posted above, but yours looks about like what I'd expect with the SELECT DISTINCT and the LEFT JOIN on id (which I assume is your unique ID).

If you're not concerned about #2 I mentioned above (i.e. you only care about duplicates that happen within a limited time window), you could also consider using the new Firehose embedded Lambda function feature (https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html) to do some clever deduping. Maybe you could have it store the last hour or day worth of unique event IDs that it has seen in a Dynamo DB table and try to dedupe that way. I haven't tried it...just an idea.

Good luck!

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

For example I have this s3 file :

2535270|2017-01-02 09:23:26|115424|19|native|696|app|89351|114020006
2535271|2017-01-02 09:23:27|193647|19|native|2887|app|191442|124000005
2535272|2017-01-02 09:23:30|115424|19|native|696|app|89351|114020006
2535273|2017-01-02 09:23:30|83137|19|native|696|app|62830|114020006
2535274|2017-01-02 09:23:38|120666|19|native|696|app|134184|114020006
2535276|2017-01-02 09:23:41|193734|19|native|696|app|62414|114020006
2535275|2017-01-02 09:23:41|66597|17|native|696|app|64801|114020006
2535277|2017-01-02 09:23:41|58405|19|native|696|app|64047|114020006
2535278|2017-01-02 09:23:45|68352|17|native|696|app|72727|114020006
2535279|2017-01-02 09:23:46|169893|19|native|696|app|63385|114020006
2535280|2017-01-02 09:23:47|133019|19|native|696|app|91773|114020006
2535281|2017-01-02 09:23:47|110088|17|native|696|app|63424|114020006 <<<
2535282|2017-01-02 09:23:47|68352|17|native|696|app|72727|114020006
2535281|2017-01-02 09:23:47|110088|17|native|696|app|63424|114020006 <<< 
2535282|2017-01-02 09:23:47|68352|17|native|696|app|72727|114020006
2535283|2017-01-02 09:23:48|166255|19|native|696|app|62825|114020006
2535284|2017-01-02 09:23:49|166255|19|native|696|app|62825|114020006
2535285|2017-01-02 09:23:49|142708|19|native|696|app|115740|114020006
2535286|2017-01-02 09:23:49|50296|19|native|696|app|29697|114020006
2535287|2017-01-02 09:23:52|193734|19|native|696|app|62414|114020006
2535288|2017-01-02 09:23:52|80034|19|native|696|app|73788|114020006
2535289|2017-01-02 09:23:53|88816|19|native|696|app|100378|114020006
2535290|2017-01-02 09:23:53|193734|19|native|696|app|62414|114020006
2535291|2017-01-02 09:23:59|124061|19|native|696|app|62475|114020006
2535292|2017-01-02 09:24:01|115424|19|native|696|app|89351|114020006
2535293|2017-01-02 09:24:04|50490|19|native|696|app|53979|114020006
2535294|2017-01-02 09:24:10|69294|19|native|696|app|74189|114020006
2535295|2017-01-02 09:24:15|50111|17|native|696|app|43511|114020006
2535296|2017-01-02 09:24:17|115424|19|native|696|app|89351|114020006
2535298|2017-01-02 09:24:17|56702|19|native|696|app|43923|114020006
2535297|2017-01-02 09:24:17|69063|19|native|696|app|72232|114020006
2535299|2017-01-02 09:24:22|188513|19|native|696|app|174704|114020006
2535300|2017-01-02 09:24:23|188513|19|native|696|app|174704|114020006

The line starting with 2535281 is inserted twice in s3 .

from lambda-streams-to-firehose.

IanMeyers avatar IanMeyers commented on July 30, 2024

Unfortunately, because Kinesis supports as at-least-once delivery semantic, you are unable to suppress duplications unless you track ID's using a secondary mechanism (which can then move you to 'at most once' delivery semantics, which opens the possibility of data loss. It is best to leave the duplicate records in your delivery destination in S3, and deduplicate them within the analysis system you are using.

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

Thanks a lot for you answer @IanMeyers. It's more clearer.

What do you mean by :

deduplicate them within the analysis system you are using
?

Thanks again Ian

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

Will probably follow Brent Nash, even if I didn't found the way to trigger all of this for the moment using this lambda and firehose.

We first load telemetry data into a temporary staging table and then perform a modified merge operation to remove duplicates. We use a SELECT DISTINCT query and then LEFT JOIN our staging table against our destination table on event_id to get rid of duplicate records that may have been introduced by Amazon Kinesis-related retries or during backfills of old data. We also make sure to load our data in sort key order to reduce or even eliminate the need for VACUUM operations.

From : https://aws.amazon.com/blogs/big-data/building-an-event-based-analytics-pipeline-for-amazon-game-studios-breakaway/

Closing the issue because it's not related to the lambda itself.

from lambda-streams-to-firehose.

IanMeyers avatar IanMeyers commented on July 30, 2024

Yes, Brent is correct in one way to correctly merge new data that may contain duplicates. Thanks!

from lambda-streams-to-firehose.

 avatar commented on July 30, 2024

Hey @benoittgt ,

I'm on vacation in the mountains at the moment, so my internet is spotty, but let me try to share a few details.

As Ian mentioned, Kinesis has "at least once" semantics, so you can get the occasional duplicate. In my experience, they're mostly due to producer retries. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html for more details.

In the system(s) I've built, we do what Ian mentioned where we archive all events (including duplicates) to S3 as sort of the "raw" record of what was received. Deduplication happens when we moving data from Kinesis or S3 into destination data stores like Redshift or ElasticSearchService.

The basic gist of it is that the producer (i.e. the thing sending data to Kinesis) generates a unique v4 UUID (an "event_id") and sends it as part of every event. This is the field that can be used to deduplicate.

When loading into Redshift, use a temporary staging table and then we perform a LEFT JOIN against the destination table(s) and only take event_ids that don't already exist in the destination table(s).

In the case of ElasticSearchService, you can do a similar thing by using the event_id as the id in the standard index/type/id mapping ESS provides (this actually results in duplicates just getting overwritten with the same record, but has the effect of deduping).

You can come up with similar mechanisms for other data stores as well.

Hope that helps and let me know if you need any further details.

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

Thanks a lot @brentnash. Sorry to bother you during your vacation.

Deduplication happens when we moving data from Kinesis
Maybe it's a secret but how do you move it ? Lambda with cron job ? Firehose ?

I'm using Firehose for the moment to copy from S3 to Redshift. I will probably COPY with firehose data to a staging table and then do like you (remove duplicates, reorder and then merge) with a lambda scheduled with a cronjob. I would probably have to think about how to properly clean the staging table after a merge without disturb Firehose invoking COPY command. I don't know yet.

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

I get an other answer on : http://disq.us/p/1eyg90w

I will add this mechanism tomorrow and will publish a blog post about the implementation next month.

Thanks again to both of you.

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

@brentnash The only issue I will have with keeping this lambda with firehose copying into Redshift is that the staging table will constantly received data. I'm gonna probably use two staging table and

  1. alter the name of both tables to have always a table that can received data (staging_table) from firehose command and the other one processing previously received records (processing_table)
  2. process and merge the data to final table
  3. empty "processing_table" and swap name again

That looks quite complicate for few duplicates but I think this is the best to do.

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

I have something similar to

BEGIN;
ALTER TABLE active_connections_temp
RENAME TO active_connections_process;
CREATE TABLE active_connections_temp (LIKE active_connections_process);
COMMIT;

BEGIN;
INSERT INTO active_connections_final
SELECT DISTINCT active_connections_process.*
FROM active_connections_process
LEFT JOIN active_connections_final ON active_connections_final.id = active_connections_process.id
WHERE active_connections_final.id IS NULL
ORDER BY active_connections_process.id;
DROP TABLE active_connections_process;
COMMIT;

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

Thanks @brentnash for your answer. We have now a lambda that run in production with a stagging table.

Code looks like :

'use strict';
const config = require('./redshift_config_from_env');
const redshiftConn = `pg://${config.user}${config.password}@${config.host}/${config.database}`;
const pgp = require('pg-promise')();

var tableCopyQuery = function(tableName) {
  return `
      ALTER TABLE ${tableName}_temp
      RENAME TO ${tableName}_process;
      CREATE TABLE ${tableName}_temp (LIKE ${tableName}_process);`;
};
var insertQuery = function(tableName) {
  return `
      INSERT INTO ${tableName}
      SELECT DISTINCT ${tableName}_process.*
        FROM ${tableName}_process
        LEFT JOIN ${tableName} USING (id)
       WHERE ${tableName}.id IS NULL
       ORDER BY ${tableName}_process.id;
      DROP TABLE ${tableName}_process;`;
};

exports.handler = function(event, context) {
  const client = pgp(redshiftConn);

  return client.tx(function (t) {
    return t.batch([
      t.none(tableCopyQuery('user_stats')),
      t.none(insertQuery('user_stats')),
      t.none(tableCopyQuery('admin_stats')),
      t.none(insertQuery('admin_stats'))
    ]);
  })
    .then(function () {
      return context.succeed(`Successfully merged.`);
    })
    .catch(function (error) {
      return context.fail(`Failed to run queries : ${JSON.stringify(error)}`);
    });
};

For the moment it's working but will wait few days to be sure.

from lambda-streams-to-firehose.

 avatar commented on July 30, 2024

Hey @benoittgt,

Just to follow up, I checked my merge SQL and it looks pretty similar to yours. The only differences I see are:

  1. We dedupe on the combination of "id" and "timestamp" rather than just "id" to cover the unlikely scenario that an ID repeats. This is probably unnecessary in most cases.

  2. We use time-series tables in Redshift to keep down the size (and need for vacuums) on our destination tables so we actually end up doing multiple merges (staging to November events, staging to December events, etc.). Probably not necessary in your use case unless you decide to go the time-series route.

One other thought is that you may want to check what the PRIMARY KEY/SORT KEY/DIST KEY are set to on your staging table. If your "id" is not part of those, your merges make take longer than necessary. Though since you're using CREATE TABLE ... LIKE ... you might not have a choice since you'll inherit those values from your parent table.

Glad to hear it seems to be working for you!

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

Hello @brentnash

  1. The id is uuid generated by the backend. So it's unique.
  2. Didn't thought about time-series tables we not using them. Merges will be easier :)

Thanks a lot ! 2 days and it's still working as wanted.

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

We finally had sometimes issue with other insert queries with Redshift Serializable isolation violation on table. We finally remove the redshift insert from Firehose and let it run only the s3 insert. The copy from s3 to a temp table and insert into final table are made in one transaction by a lambda.

Also all insert query transaction lock the Redshift table before doing anything. It works quite perfectly (expect #37).

It takes some times but we finally have a solution that can be easily debugged and that is very efficient.

Thanks again for the help.

from lambda-streams-to-firehose.

benoittgt avatar benoittgt commented on July 30, 2024

I published with my team two blog post about our migration. You are mentioned ! Thanks again for your help

https://medium.com/appaloosa-store-engineering/migrating-our-analytics-stack-from-mongodb-to-aws-redshift-334230d9ef7e

https://medium.com/appaloosa-store-engineering/from-mongodb-to-aws-redshift-a-practical-guide-5ec8ee8fb147

from lambda-streams-to-firehose.

 avatar commented on July 30, 2024

from lambda-streams-to-firehose.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.