Coder Social home page Coder Social logo

pg2k4j's Introduction

Build Status codecov

This project is no longer actively maintained by Disney Streaming Services

You are welcome to use and fork this project, and we may occasionally review and merge pull requests submitted from contributors or dependabot/Snyk. However we no longer use this internally and therefore support will be limited.

pg2k4j

Postgresql To Kinesis For Java

A tool for publishing inserts, updates, and deletes made on a Postgresql database to an Amazon Kinesis Stream. pg2k4j may be run as a stand-alone application from the command line, or used as a Java library where its functionality can be extended and customized.

Getting Started

First, setup your Postgres database to support logical replication and create an AWS Kinesis Stream.

Run pg2k4j as a Stand-alone Application

Download Docker and login with your docker credentials to gain access to the pg2k4j docker repository. Then run the command below.

docker run -v /path/to/.aws/creds/:/aws_creds 
disneystreaming/pg2k4j 
--awsconfiglocation=/aws_creds --awsprofile=default
--pgdatabase=<your_postgres_db> --pghost=<your_postgres_host> --pguser=<your_postgres_user> --pgpassword=<your_postgres_pw> 
--streamname=<your_kinesis_streamname>

When you observe the below log, pg2k4j is set to publish changes to Kinesis.

[main] INFO com.disneystreaming.pg2k4j.SlotReaderKinesisWriter - Consuming from slot pg2k4j

Use as a Java Library

pg2k4j artifacts are published to maven central

Maven
<dependency>
    <groupId>com.disneystreaming.pg2k4j</groupId>
    <artifactId>pg2k4j</artifactId>
    <version>LATEST</version>
</dependency>
Gradle
compile group: 'com.disneystreaming.pg2k4j', name: 'pg2k4j', version: 'LATEST'
SBT
libraryDependencies += "info.pg2k4j" % "pg2k4j" % "LATEST"

To initialize and begin publishing database changes to Kinesis, create a SlotReaderKinesisWriter and call its runLoop method.

Why We Wrote pg2k4j

pg2k4j is an implementation of a powerful design pattern called Change Data Capture. By using pg2k4j, anyone can know the state of your database at any point in time by consuming from the Kinesis Stream. At DSS we have rapidly changing datasets that many teams need to access and process in their own way. pg2k4j alleviates the need to grant database access to each team or to stand up an API on top of the dataset. This keeps the load down on your database, making it possible to max out its write throughput.

Benefits Over Existing Solutions

Before writing pg2k4j, we explored existing solutions. We used pg2kinesis but found that this implementation simply couldn't keep up with the write throughput that we required. As a JVM app, pg2k4j can natively integrate with Amazon's Kinesis Producer Library allowing it to achieve write speeds of over 1 million records per minute, which is orders of magnitude faster than the performance we observed with its Python counterpart.

How it Works

1. pg2k4j Opens up a Logical Replication Slot on the Postgresql database.

A replication slot will stream changes made on the database to the listener of the replication slot in the format specified by the plugin used for that replication slot. By default pg2k4j uses the wal2json plugin which outputs a json representation of a SlotMessage to the listening thread. Postgres writes all data changes to the Write Ahead Log, which, as well as ensuring data integrity and crash safety, makes it possible to perform logical replication. Each replication slot maintains a pointer to a position in the WAL, indicating the last sequence number this replication slot has processed. This pointer allows Postgres to flush all sections of the WAL which occurred before this sequence number. Crucially, if the application maintaining the replication slot does not update this sequence number, the storage space on the database will fill up because Postgres won't be able to clear any sections of the WAL. To view this sequence number you can run the below query on your database.

select * from pg_replication_slots

Details of how pg2k4j manages this pointer are outlined later in this section.

2. pg2k4j deserializes the json output sent by the wal2json plugin to a SlotMessage.

This method should be overridden when using any plugin besides wal2json as the contents from the WAL would not be json representations of a SlotMessage.

3. pg2k4j writes this contents to the Kinesis Stream.

First the SlotMessage is turned into a Stream of UserRecord, and then these UserRecords are written to the stream with a callback attached that will be invoked once the records make it to the stream.

4. The callback is invoked when the records succeed or fail to make it to the stream.

On a successful write to the stream pg2k4j will advance the replication slot's sequence number, indicating that any data before this point may be flushed by the database. By advancing the sequence number after receiving confirmation that the record arrived on the stream, pg2k4j guarantees that each data change reaches Kinesis. Even on Postgres restart or pg2k4j restart this guarantee is preserved.

There is one other scenario wherein pg2k4j will advance the sequence number. It's important to note that each Postgres instance may have many databases, but a replication slot is configured against a single database. In the scenario where the replication slot database is idle but the other databases are active, it's important that pg2k4j still advances its pointer into the WAL so that Postgres doesn't hang onto these sections of the WAL. That's why pg2k4j advances the sequence number after a certain period of inactivity which defaults to 5 minutes.

Configuring Infrastructure

This section is a walk through on how to create your Posgresql instance configured for logical replication as an RDS instance. pg2k4j by no means requires that your Postgesql instance is an RDS instance, but since Kinesis is an AWS product, many users will likely also be running their Postgres instance on AWS. For an example of how to configure a non-RDS instance of Postgres refer to the integration tests. This section also walks through how to set up a Kinesis Stream, for which pg2k4j requires no sepcial configuration.

Start AWS RDS Postgres

Create a Parameter Group

In AWS console navigate to RDS then parameter groups and create a parameter group for the postgres10 family.

Create Parameter Group

In this parameter group, set the following values:

rds.logical_replication 1
max_wal_senders 10
max_replication_slots 10
Launch Instance

In AWS console navigate to RDS->Instances and select Launch Instance. Follow the creation wizard, selecting Postgresql for the DB engine and Postgresql 10.3-R1 for the DB engine version.

As shown below, associate the parameter group you created in the previous step with this instance.

Associate Parameter Group

Create Kinesis Stream

In AWS console navigate to Kinesis, and create a stream.

Create Kinesis Stream

Contributing

Fork the repo and submit a pr with a description detailing what this code does and what bug or feature it addresses. Any methods containing substantial logic should include javadocs.

Be sure that both integration tests and unit pass and that any new code introduced has corresponding tests. Run unit tests with

>> mvn clean test
Tests run: 13, Failures: 0, Errors: 0, Skipped: 0

and integration tests with

mvn clean verify
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

Contributors are required to fill out a CLA in order for us to be allowed to accept contributions. See CLA-Individual or CLA-Corporate for details.

Releasing

Releasing is automatic and is driven by tags. To release simply tag the master branch with a Semantic version, e.g. 1.0.0.

This will update the pom with the version, publish to maven, and build and push the Docker image to Dockerhub.

pg2k4j's People

Contributors

dependabot[bot] avatar dss-osc avatar lashford avatar markglh avatar rkass avatar snyk-bot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

pg2k4j's Issues

Unable to use a role - assume role

Was anyone able to get pg2k4j working with using AWS ECS task roles. I get it to work for the standard AWS tools inside of my container, but I fail to see how i can make pk2k4j pickup the role.

On startup a replication slot was created but is being used by another process.

On an rds postgres instance that did not have replication enabled before using pg2k4j I enabled replication and ran the pg2k4j cli to connect to it. I verified that no pg2k4j replication slot existed before running the cli. The cli connects but errors out with the error shown below. It appears to have created the replication slot, but some other process has decided to connect to it, so now pg2k4j can't connect to it. Any ideas??

I'm running on the disneystreaming/pg2k4j docker image that I pulled yesterday (10/7/2019)
Running against postgres version 10.6

[main] INFO com.disneystreaming.pg2k4j.PostgresConnector - Attempting to create replication slot pg2k4j
[main] INFO com.disneystreaming.pg2k4j.PostgresConnector - Slot pg2k4j already exists
[main] INFO com.disneystreaming.pg2k4j.PostgresConnector - Replication slot currently has another process consuming from it
[main] INFO com.disneystreaming.pg2k4j.PostgresConnector - Sleeping for 30 seconds before retrying 29 more times
org.postgresql.util.PSQLException: ERROR: replication slot "pg2k4j" is active for PID 7957
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
        at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1116)
        at org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:842)
        at org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:58)
        at org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:42)
        at org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38)
        at org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:37)
        at com.disneystreaming.pg2k4j.PostgresConnector.getPgReplicationStreamHelper(PostgresConnector.java:228)
        at com.disneystreaming.pg2k4j.PostgresConnector.getPgReplicationStream(PostgresConnector.java:184)
        at com.disneystreaming.pg2k4j.PostgresConnector.<init>(PostgresConnector.java:98)
        at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.createPostgresConnector(SlotReaderKinesisWriter.java:293)
        at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.readSlotWriteToKinesis(SlotReaderKinesisWriter.java:123)
        at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.runLoop(SlotReaderKinesisWriter.java:86)
        at com.disneystreaming.pg2k4j.CommandLineRunner.run(CommandLineRunner.java:30)
        at java.base/java.util.Optional.ifPresent(Optional.java:183)
        at com.disneystreaming.pg2k4j.CommandLineRunner.main(CommandLineRunner.java:45)

--kinesisendpoint only uses https

Even if I explicitly request http:

--kinesisendpoint=http://docker.for.mac.host.internal:4567

Running kinesalite with the --ssl option makes pg2k4j happy, but then my other libs are unhappy.

Thanks.

Data must be less than or equal to 1MB in size

This is the sameor similar to the closed issue #14 #14. I can't reopen it myself.

In my use case I have a transaction that is larger than 1MB and this is causing pg2k4j to get stuck reading that same WAL message over and over. This causes my disk to fill up (because WAL is never successfully read) and the database crashes.

The recommendation in that issue was to set wal_writer_flush_after lower than 1mb and to set wal_writer_delay to something short.

I have set wal_writer_flush_after to 20 and set wal_writer_delay to 200. This does not resolve the problem. Do you have other suggestions?

A 1MB transaction is large, but is not an unreasonable use case. For example if I add a column to an existing table with a million records and then need to backfill data in that column in a transaction it would fail. Or if I had to bulk insert thousands of new members for my site, this would cause the same error. Because of this issue I can't use this software.

Here is my error:

2020-01-03T21:57:45.924263600Z [main] ERROR com.disneystreaming.pg2k4j.SlotReaderKinesisWriter - Received exception of type class java.lang.IllegalArgumentException
2020-01-03T21:57:45.924932900Z java.lang.IllegalArgumentException: Data must be less than or equal to 1MB in size, got 2346047 bytes
2020-01-03T21:57:45.924944900Z  at com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:517)
2020-01-03T21:57:45.924949500Z  at com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:406)
2020-01-03T21:57:45.924952800Z  at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.lambda$processByteBuffer$0(SlotReaderKinesisWriter.java:242)
2020-01-03T21:57:45.924956100Z  at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
2020-01-03T21:57:45.924959100Z  at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
2020-01-03T21:57:45.924970900Z  at java.base/java.util.stream.Streams$StreamBuilderImpl.forEachRemaining(Streams.java:411)
2020-01-03T21:57:45.924974400Z  at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
2020-01-03T21:57:45.924977500Z  at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
2020-01-03T21:57:45.924980500Z  at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
2020-01-03T21:57:45.924983600Z  at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
2020-01-03T21:57:45.924986600Z  at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
2020-01-03T21:57:45.924989600Z  at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
2020-01-03T21:57:45.924992700Z  at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.processByteBuffer(SlotReaderKinesisWriter.java:234)
2020-01-03T21:57:45.924995900Z  at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.readSlotWriteToKinesisHelper(SlotReaderKinesisWriter.java:195)
2020-01-03T21:57:45.924999700Z  at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.readSlotWriteToKinesis(SlotReaderKinesisWriter.java:131)
2020-01-03T21:57:45.925003500Z  at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.runLoop(SlotReaderKinesisWriter.java:86)
2020-01-03T21:57:45.925007300Z  at com.disneystreaming.pg2k4j.CommandLineRunner.run(CommandLineRunner.java:30)
2020-01-03T21:57:45.925011800Z  at java.base/java.util.Optional.ifPresent(Optional.java:183)
2020-01-03T21:57:45.925041500Z  at com.disneystreaming.pg2k4j.CommandLineRunner.main(CommandLineRunner.java:45)
2020-01-03T21:57:47.223098700Z [main] INFO com.disneystreaming.pg2k4j.PostgresConnector - Attempting to create replication slot pg2k4j
2020-01-03T21:57:47.271197900Z [main] INFO com.disneystreaming.pg2k4j.PostgresConnector - Slot pg2k4j already exists
2020-01-03T21:57:47.326846300Z [main] INFO com.amazonaws.services.kinesis.producer.KinesisProducer - Extracting binaries to /tmp/amazon-kinesis-producer-native-binaries
2020-01-03T21:57:47.819365800Z [main] INFO com.amazonaws.services.kinesis.producer.HashedFileCopier - '/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D' already exists, and matches.  Not overwriting.
2020-01-03T21:57:47.829184100Z [main] INFO com.disneystreaming.pg2k4j.SlotReaderKinesisWriter - Consuming from slot pg2k4j

To reproduce:

  1. Run pg2k4j against a database
  2. Create a table with the following DDL
CREATE TABLE public.my_test_table (
	id serial,
	"name" varchar(100) NOT NULL,
	property_1 varchar(200) not null,
	property_2 varchar(200) not null,
	property_3 varchar(200) not null,
	property_4 varchar(200) not null,
	property_5 varchar(200) not null,
	property_6 varchar(200) not null,
	property_7 varchar(200) not null,
	property_8 varchar(200) not null,
	property_9 varchar(200) not null,
	CONSTRAINT my_test_table_pkey PRIMARY KEY (id)
);
  1. Create an insert script similar to the following with 3500 inserts.
BEGIN;
INSERT INTO opportunity.public.james_test_table
("name", property_1, property_2, property_3, property_4, property_5, property_6, property_7, property_8, property_9)
VALUES(concat(md5(random()::text), md5(random()::text)), concat(md5(random()::text), md5(random()::text)), concat(md5(random()::text), md5(random()::text)), concat(md5(random()::text), md5(random()::text)), concat(md5(random()::text), md5(random()::text)), concat(md5(random()::text), md5(random()::text)), concat(md5(random()::text), md5(random()::text)), concat(md5(random()::text), md5(random()::text)), concat(md5(random()::text), md5(random()::text)), concat(md5(random()::text), md5(random()::text)));
...
< add 3500 more of the previous insert statement here>
...
COMMIT;
  1. Run the insert script against the postgres database that is running pg2k4j
psql --host=my.ip.address --port=5447 --username=test_user --dbname=test_database --file "C:\dev\sql_scripts\my_temp_table.sql"
  1. Watch the logs on your pg2k4j container and wait for the transaction to finish. Once it is finished you will see the error.
docker logs reverent_pike --since 10m -t --follow

pg2k4j fails when `xid`s are between 32-bit int max and unsigned 32-bit int max

Postgres transaction IDs are of type xid, unsigned 32-bit int. pg2k4j assumes they are signed 32-bit ints https://github.com/disneystreaming/pg2k4j/blob/master/src/main/java/com/disneystreaming/pg2k4j/models/SlotMessage.java#L35. This causes JSON parsing of WAL messages into SlotMessage to fail with com.fasterxml.jackson.core.JsonParseException: Numeric value (...) out of range of int when a there's an xid between signed 32bit int max and unsigned 32bit int max.

Running local Kinesis still requires AWS access

Even when using local Kinesis (Kinesalite) and the --kinesisendpoint option, pg2k4j still requires I put in AWS creds and it makes AWS API calls:

[kpl-daemon-0003] WARN com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2019-09-07 18:32:42.777668] [0x000005b2][0x00007fe2cb436700] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError
AccessDenied
User: arn:aws:iam::xxxxxxxxxxx:user/ci is not authorized to perform: cloudwatch:PutMetricData:

If using this --kinesisendpoint option, shouldn't it assume that the user is developing locally and thus not require AWS creds or try to use the AWS API at all?

CommandLineRunner.makeProfile

Hi, I am getting the following error when trying to run the docker container.

I am running it using the parameters that are specified in the README section.

I think the problem is in the first if statement using || instead of &&

    private AWSCredentialsProvider getAwsCredentialsProvider() {
        if (awsProfile != null || awsConfigLocation != null) {
            final String profile = makeProfile(awsProfile);
            return new ProfileCredentialsProvider(awsConfigLocation, profile);
        } else if (awsAccessKey != null && awsSecretKey != null) {
            return new AWSStaticCredentialsProvider(
                    new BasicAWSCredentials(awsAccessKey, awsSecretKey));
        } else {
            return new DefaultAWSCredentialsProviderChain();
        }
    }

Picked up JAVA_TOOL_OPTIONS:
Exception in thread "main" java.lang.NullPointerException
at com.disneystreaming.pg2k4j.CommandLineRunner.makeProfile(CommandLineRunner.java:217)
at com.disneystreaming.pg2k4j.CommandLineRunner.getAwsCredentialsProvider(CommandLineRunner.java:205)
at com.disneystreaming.pg2k4j.CommandLineRunner.getKinesisProducerConfiguration(CommandLineRunner.java:229)
at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.(SlotReaderKinesisWriter.java:77)
at com.disneystreaming.pg2k4j.CommandLineRunner.run(CommandLineRunner.java:25)
at java.base/java.util.Optional.ifPresent(Optional.java:183)
at com.disneystreaming.pg2k4j.CommandLineRunner.main(CommandLineRunner.java:45)

pg2k4j not working correctly on PostgreSQL version 11.x

I have two Postgres databases that I'm trying to use with pg2k4j. The first is on version 9.6.x and works like a charm. The second is on 11.2.x and runs into issues.

  1. When I try to connect with SSL, the connection ends up getting dropped right away, giving the error Database connection failed when writing to copy.
  2. When I turn SSL off, I get at most one batch of records, and then nothing but empty record sets { stream: '...', manual: 0, count: 0, size: 0, matches: 0, timed: 0, UserRecords: 0, KinesisRecords: 0 }

After running pg2k4j with the Java Postgres driver logging turned all the way up, I saw weird packet errors that indicate pgjdbc/pgjdbc#1466 as the root cause. The fact that folks using Debezium also have issues with number 1 (see https://groups.google.com/forum/#!msg/debezium/ocmQdBL6_Rk/BvKt3Yp3AQAJ) supports that pgjdbc is the problem.

I'd love to know if others are having problems with 11.x and to find out if there is a workaround.

Append profile to AWS Credentials File.

Can't use a profile in my AWS credentials file unless I prepend profile to the profile. The log states that the latest code doesn't require a profile prefix but It will not recognize a target profile without it.

Currently using version 1.0.6

WARNING: The legacy profile format requires the 'profile ' prefix before the profile name. The latest code does not require such prefix, and will consider it as part of the profile name. Please remove the prefix if you are seeing this warning.
[main] INFO com.disneystreaming.pg2k4j.PostgresConnector - Attempting to create replication slot pg2k4j
[main] INFO com.disneystreaming.pg2k4j.PostgresConnector - Slot pg2k4j already exists
[main] INFO com.amazonaws.services.kinesis.producer.KinesisProducer - Extracting binaries to /tmp/amazon-kinesis-producer-native-binaries
[main] INFO com.disneystreaming.pg2k4j.SlotReaderKinesisWriter - Consuming from slot pg2k4j
[kpl-daemon-0000] ERROR com.amazonaws.services.kinesis.producer.KinesisProducer - Error in child process
java.lang.RuntimeException: Error running child process
	at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533)
	at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513)
	at com.amazonaws.services.kinesis.producer.Daemon.access$200(Daemon.java:63)
	at com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:135)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: No AWS profile named 'profile docker'
	at com.amazonaws.auth.profile.ProfilesConfigFile.getCredentials(ProfilesConfigFile.java:158)
	at com.amazonaws.auth.profile.ProfileCredentialsProvider.getCredentials(ProfileCredentialsProvider.java:161)
	at com.amazonaws.services.kinesis.producer.Daemon.makeSetCredentialsMessage(Daemon.java:565)
	at com.amazonaws.services.kinesis.producer.Daemon.startChildProcess(Daemon.java:436)
	at com.amazonaws.services.kinesis.producer.Daemon.access$100(Daemon.java:63)
	at com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:133)
	... 3 more

Illegal Argument Exception thrown when connecting rds postgress 10 with kinesis.

I installed your application using docker. connected the rds and kinesis but container stops with error
ERROR com.disneystreaming.pg2k4j.SlotReaderKinesisWriter - Received exception of type class java.lang.IllegalArgumentException java.lang.IllegalArgumentException: Data must be less than or equal to 1MB in size, got 9909154 bytes at com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:517) at com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:406) at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.lambda$processByteBuffer$0(SlotReaderKinesisWriter.java:242) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.Streams$StreamBuilderImpl.forEachRemaining(Streams.java:419) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.processByteBuffer(SlotReaderKinesisWriter.java:234) at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.readSlotWriteToKinesisHelper(SlotReaderKinesisWriter.java:195) at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.readSlotWriteToKinesis(SlotReaderKinesisWriter.java:131) at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.runLoop(SlotReaderKinesisWriter.java:86) at com.disneystreaming.pg2k4j.CommandLineRunner.run(CommandLineRunner.java:30) at java.util.Optional.ifPresent(Optional.java:159) at com.disneystreaming.pg2k4j.CommandLineRunner.main(CommandLineRunner.java:45)

I know there is a limit of 1 MB data to be sent to the kinesis.

But I have set 'max_wal_size' to 1024.

Why did you stop using this technology?

Hi there!

First of all, thanks for sharing this work!

The README headline mentions that you are no longer using this tool internally: would you be ok to share why, as well as the replacement solution you found?
I'm currently considering several CDC options for a production system, and any knowledge and guidance would be welcome.

Thank you!

unknown flag: --awsaccesskey

It would be great if we could run the docker container just by passing the Access Key and Secret. The code seems to be supporting this but Docker is throwing an error when specifying those parameters.

docker run -v disneystreaming/pg2k4j --awsaccesskey=AccessKey--awssecret=Secret --region=us-east-2 --pgdatabase=AAAAA --pghost=BBBBB --pguser=CCCCC --pgpassword=DDDD --streamname=EEEEE

unknown flag: --awsaccesskey
See 'docker run --help'.

Underscores not allowed in --kinesisendpoint

Underscores should be allowed in hostnames.

Exception in thread "main" java.lang.IllegalArgumentException: kinesisEndpoint must match the pattern ^([A-Za-z0-9-\.]+)?$, got foo_nginx_1
	at com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.setKinesisEndpoint(KinesisProducerConfiguration.java:985)
	at com.disneystreaming.pg2k4j.CommandLineRunner.getKinesisProducerConfiguration(CommandLineRunner.java:237)
	at com.disneystreaming.pg2k4j.SlotReaderKinesisWriter.<init>(SlotReaderKinesisWriter.java:77)
	at com.disneystreaming.pg2k4j.CommandLineRunner.run(CommandLineRunner.java:25)
	at java.base/java.util.Optional.ifPresent(Optional.java:183)
	at com.disneystreaming.pg2k4j.CommandLineRunner.main(CommandLineRunner.java:45)

Using jvm properties to simplify versioning logic

The versioning logic is currently a bit awkward, with the CI creating new branches/committing/pushing on its own for the sole purpose of versioning.

A simpler approach would be to use jvm properties in order to dynamically resolve the version using some logic around git describe.

You could imagine providing a build script which would essentially redirect the args it gets called with to mvn -Drevision={insert_dynamic_version}, making the version easily reproducible/inspectable locally.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.