Coder Social home page Coder Social logo

isabella232 / flink-sql-cdc Goto Github PK

View Code? Open in Web Editor NEW

This project forked from ververica/flink-sql-cdc

0.0 0.0 0.0 73 KB

Self-contained demo using Flink SQL and Debezium to build a CDC-based analytics pipeline. All you need is Docker! :whale:

Shell 6.61% Dockerfile 93.39%

flink-sql-cdc's Introduction

Change Data Capture with Flink SQL and Debezium

See the slides for context.

Docker

To keep things simple, this demo uses a Docker Compose setup that makes it easier to bundle up all the services you need for your CDC pipeline:

demo_overview

Getting the setup up and running

docker-compose build

docker-compose up -d

Is everything really up and running?

docker-compose ps

You should be able to access the Flink Web UI (http://localhost:8081), as well as Kibana (http://localhost:5601).

Postgres

Start the Postgres client to have a look at the source tables and run some DML statements later:

docker-compose exec postgres env PGOPTIONS="--search_path=claims" bash -c 'psql -U $POSTGRES_USER postgres'

What tables are we dealing with?

SELECT * FROM information_schema.tables WHERE table_schema='claims';

Debezium

Start the Debezium Postgres connector using the configuration provided in the register-postgres.json file:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json

Check that the connector is running:

curl http://localhost:8083/connectors/claims-connector/status # | jq

The first time it connects to a Postgres server, Debezium takes a consistent snapshot of all database schemas; so, you should see that the pre-existing records in the accident_claims table have already been pushed into your Kafka topic:

docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic pg_claims.claims.accident_claims

ℹ️ Have a quick read about the structure of these events in the Debezium documentation.

Is it working?

In the tab you used to start the Postgres client, you can now run some DML statements to see that the changes are propagated all the way to your Kafka topic:

INSERT INTO accident_claims (claim_total, claim_total_receipt, claim_currency, member_id, accident_date, accident_type, accident_detail, claim_date, claim_status) VALUES (500, 'PharetraMagnaVestibulum.tiff', 'AUD', 321, '2020-08-01 06:43:03', 'Collision', 'Blue Ringed Octopus', '2020-08-10 09:39:31', 'INITIAL');
UPDATE accident_claims SET claim_total_receipt = 'CorrectReceipt.pdf' WHERE claim_id = 1001;
DELETE FROM accident_claims WHERE claim_id = 1001;

In the output of your Kafka console consumer, you should now see three consecutive events with op values equal to c (an insert event), u (an update event) and d (a delete event).

Flink SQL

Start the Flink SQL Client:

docker-compose exec sql-client ./sql-client.sh

Register a Postgres catalog, so you can access the metadata of the external tables over JDBC:

CREATE CATALOG postgres WITH (
    'type'='jdbc',
    'property-version'='1',
    'base-url'='jdbc:postgresql://postgres:5432/',
    'default-database'='postgres',
    'username'='postgres',
    'password'='postgres'
);

Create a changelog table to consume the change events from the pg_claims.claims.accident_claims topic, with the same schema as the accident_claims source table, that consumes the debezium-json format:

CREATE TABLE accident_claims
WITH (
  'connector' = 'kafka',
  'topic' = 'pg_claims.claims.accident_claims',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'test-consumer-group',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset'
 )
LIKE postgres.postgres.`claims.accident_claims` ( 
EXCLUDING OPTIONS);

and register a reference members table:

CREATE TABLE members
LIKE postgres.postgres.`claims.members` ( 
INCLUDING OPTIONS);

ℹ️ For more details on the Debezium Format, refer to the Flink documentation.

Ch-ch-ch-ch-changes

You can now query the changelog source table you just created:

SELECT * FROM accident_claims;

and (same as before) insert, update and delete a record in the Postgres client to see how the query results update in (near) real-time.

ℹ️ If you're curious to see what's going on behind the scenes (i.e. how queries translate into continuously running Flink jobs) check the Flink Web UI (http://localhost:8081).

Visualizing the Results

Create a sink table that will write to a new agg_insurance_costs index on Elasticsearch:

CREATE TABLE agg_insurance_costs (
  es_key STRING PRIMARY KEY NOT ENFORCED,
  insurance_company STRING,
  accident_detail STRING,
  accident_agg_cost DOUBLE
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://elasticsearch:9200',
  'index' = 'agg_insurance_costs'
);

and submit a continuous query to the Flink cluster that will write the aggregated insurance costs per insurance_company, bucketed by accident_detail (or, what animals are causing the most harm in terms of costs):

INSERT INTO agg_insurance_costs
SELECT UPPER(SUBSTRING(m.insurance_company,0,4) || '_' || SUBSTRING (ac.accident_detail,0,4)) es_key,
       m.insurance_company,
       ac.accident_detail,
       SUM(ac.claim_total) member_total
FROM accident_claims ac
JOIN members m
ON ac.member_id = m.id
WHERE ac.claim_status <> 'DENIED'
GROUP BY m.insurance_company, ac.accident_detail;

Finally, create a simple dashboard in Kibana with a 1s refresh rate and use the (very rustic) postgres_datagen.sql data generator script to periodically insert some records into the Postgres source table, creating visible changes in your results:

cat ./postgres_datagen.sql | docker exec -i flink-sql-cdc_postgres_1 psql -U postgres -d postgres

flink-sql-CDC_kibana


And that's it!

This demo will get some polishing as CDC support in Flink matures.

If you have any questions or feedback, feel free to DM me on Twitter @morsapaes.

flink-sql-cdc's People

Contributors

morsapaes avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.