Coder Social home page Coder Social logo

teste-kafka-coyote's Introduction

Kafka Connect JDBC test environment

This Docker Compose can be used to spin up an environment in which to explore and test: 1. Kafka Connect JDBC source connector : mysql. 2. Kafka Connect JDBC sink connector : postgresql. 3. ksql

Current environment

Confluent Platform

5.1.0

MySQL

8.0.13

Postgres

11.1

MS SQL Server

2017 (RTM-CU13) (KB4466404) - 14.0.3048.4 (X64)

Oracle

12.2.0.1.0 (from github)

Start And test the system

START
run ./scripts/start.sh


MySQL
MYSQL_ROOT_PASSWORD=Admin123 & echo $MYSQL_ROOT_PASSWORD
docker exec -it db_mysql bash  -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD'
or
docker-compose exec mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD'
mysql>ALTER USER 'root' IDENTIFIED WITH mysql_native_password BY '123';
mysql> show databases;
mysql> use demo;
mysql>select id,first_name,last_name, username, company,created_date,update_ts from accounts;
expect to see 10 accounts
mysql>exit


KAFKA KSQL
docker exec -it ksql-cli ksql http://ksql-server:8088
or
docker-compose exec ksql-cli ksql http://ksql-server:8088
guidelines for a basic KSQL server:4 cores,32 GB RAM, 100 GB SSD, 1 Gbit network)


KAFKA CONNECT
docker exec -it connect bash
or
docker-compose exec kafka-connect bash
cd /usr/share/java/kafka-connect-jdbc/jars
ls |grep  ojdbc8.jar


POSTGRES
docker exec -it db_postgres psql kafka-sink connect_user
or
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
kafka-sink=# SELECT * from accounts;
expect to see 10 accounts that moved here from mysql through kafka


STOP
run ./scripts/stop.sh


SCHEMA-REGISTRY
docker logs schema-registry


KAFKA-BROKER

docker exec -it kafka-broker kafka-topics --delete --zookeeper zookeeper:2181 --topic sampleTopic
or
docker exec -it kafka-broker bash
>ZK=zookeeper:2181
kafka-producer-perf-test --num-records 10100100 --throughput 10100100 --record-size 100 \
--producer-props acks=1 bootstrap.servers=0.0.0.0:9092 buffer.memory=67108864 compression.type=none batch.size=8196 --topic benchmark1
10,100,100 records sent | 148,000 records/sec | 14.1 MB/sec | 0.433 sec avg latency | 0.579 sec max latency....

kafka-consumer-perf-test --broker-list 0.0.0.0:9092 --messages 10100100 --threads 1 --topic benchmark1

from=2019-05-01 14:00:00:000
to = 2019-05-01 14:00:09:001
10,100,100 Msgs   consumed
 1,146,593 Msg/sec consumed
    963.3 MB      consumed
    109.3 MB/sec  consumed

in console #1
kafka-console-producer --broker-list 0.0.0.0:9092 --topic t1
in console #2
kafka-console-consumer --bootstrap-server 0.0.0.0:9092 --topic t1

or in concole #1
kafka-console-producer --broker-list 0.0.0.0:9092 --topic t_compact --property "parse.key=true" --property "key.separator=:"
key1:value1
key2:value2
key3:value3
key3:value3
in console #2
kafka-console-consumer --bootstrap-server 0.0.0.0:9092 --topic t_compact --property print.key=true --property key.separator="-" --from-beginning
notes:
1. compact mode could not be set up!
2. id is in bytes
3. create table ,streams did not work


kafka-topics --zookeeper $ZK --topic t1 --delete
kafka-topics --zookeeper $ZK --topic t1 --create  --partitions 3 --replication-factor 1 --config max.message.bytes=64000
kafka-topics --zookeeper $ZK --topic t_compact --create --partitions 3 --replication-factor 1 --config min.insync.replicas=1 --config cleanup.policy=compact --config segment.bytes=1048576
kafka-topics --zookeeper $ZK --list
kafka-configs --zookeeper $ZK --entity-type topics --entity-name t1 --alter --add-config \ 'retention.ms=1000,max.message.bytes=2048000,cleanup.policy=delete,min.compaction.lag.ms=1000'
kafka-configs --zookeeper $ZK --entity-type topics --describe
kafka-configs --zookeeper $ZK --entity-type brokers --entity-name 0  --alter --add-config log.cleanup.policy=compact
kafka-configs --zookeeper $ZK --entity-type brokers --describe

kafka-topics --zookeeper $ZK --topic t1 --describe
kafka-run-class kafka.tools.GetOffsetShell --broker-list 0.0.0.0:9092 --topic t1 --partitions 0,1,2
kafka-log-dirs --bootstrap-server 0.0.0.0:9092 --describe --topic-list  t_compact



docker logs  kafka|grep "INFO Kafka version"


ORACLE
docker-compose exec db_oracle bash -c 'sqlplus sys/$ORACLE_PWD@localhost:1521/ORCLCDB as sysdba'


MS SQL SERVER
docker-compose exec db_mssql bash -c '/opt/mssql-tools/bin/sqlcmd -l 30 -S localhost -U sa -P $SA_PASSWORD'

Connector Configuration

Play with system manually

in order to execute connector u can run 3 scripts or manually run ./scripts/connectors/submit.sh or create 3 connectors manually

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_08",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://mysql:3306/demo",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "mysql-08-",
                "mode":"bulk",
                "batch.max.rows":100,
                "table.whitelist" : "demo.accounts",
                "poll.interval.ms" : 360000
                }
        }'

 response:
        {"name":"jdbc_source_mysql_01","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","connection.url":"jdbc:mysql://mysql:3306/demo","connection.user":"connect_user","connection.password":"asgard","topic.prefix":"mysql-01-","mode":"bulk","poll.interval.ms":"10000","name":"jdbc_source_mysql_01"},"tasks":[],"type":null}
curl -s -X GET http://localhost:8083/connectors/|jq
curl -s -X GET http://localhost:8083/connectors/jdbc_source_mysql_01|jq
curl -s -X GET "http://localhost:8083/connectors/jdbc_source_mysql_08/status"|jq

ksql> PRINT 'mysql-01-accounts' FROM BEGINNING;
u will see the content of db in the topics
insert to db
mysql>INSERT INTO demo.accounts
(`id`,
`first_name`,
`last_name`,
`username`,
`company`,
`created_date`) VALUES
(30,
'lolik10',
'samuel10',
'loliksamuel',
'zim',
'2019-03-03');
after 10 sec, u will see it in the topic

create another connector in mode : timestamp
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_ts",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://mysql:3306/demo",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "mysql-08-",
                "mode":"timestamp",
                "table.whitelist" : "demo.accounts",
                "timestamp.column.name": "UPDATE_TS",
                "validate.non.null": false
                }
        }'



mysql> INSERT INTO demo.accounts (`id`, `first_name`, `last_name`, `username`, `company`, `created_date`) VALUES (30, 'lolik10', 'samuel10', 'loliksamuel', 'zim', '2019-03-03');
Query OK, 1 row affected (0.00 sec)
verify after 1 sec, that u see it in the topic

mysql>update demo.accounts set first_name = 'lolik311' where id=31;
verify after 1 sec, that u see it in the topic

mysql>delete from demo.accounts where id=31;
verify that jdbc connector does not support delete oparations. if u need it than consider use cdc transaction-log connector.
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
          "name": "src_mysql_12b",
          "config": {
                  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:mysql://mysql:3306/demo",
                  "connection.user": "connect_user",
                  "connection.password": "asgard",
                  "topic.prefix": "mysql-12b-",
                  "numeric.mapping": "best_fit",
                  "table.whitelist" : "demo.transactions",
                  "mode":"incrementing","incrementing.column.name": "txn_id",
                  "poll.interval.ms" : 3600000,
                  "validate.non.null": false
                  }
          }'





ksql> PRINT 'mysql-01-transactions' FROM BEGINNING;
verify a problem :decimal amount field is not serialized correctly. we will have to fix the connector:add "numeric.mapping": "best_fit"
4/21/19 3:11:04 PM UTC, null, {"txn_id": 1000, "customer_id": 5, "amount": {"bytes": "\nÿ"}, "currency": "IRR", "txn_timestamp": "2018-01-12T14:53:49Z"}

curl localhost:8081/subjects/mysql-12a-transactions-value/versions/1 |jq '.schema|fromjson.fields[] | select (.name == "amount")'


mysql>INSERT INTO demo.transactions   VALUES (1001, 1, 1.11, 'RUB', now());

Play with KSQL TABLE AND STREAMS

Use the CREATE STREAM statement to create a stream from a Kafka topic. Use the CREATE STREAM AS SELECT statement to create a query stream from an existing stream. KSQL can’t infer the topic’s data format, so you must provide the format of the values that are stored in the topic create a connector(using bulk is not a best practice, Valid Values: [, bulk, timestamp, incrementing, timestamp+incrementing])

ksql> SET 'auto.offset.reset'='earliest';
ksql>>SHOW |LIST functions;LIST properties;LIST streams;LIST topics;LIST tables;LIST queries;
ksql>run script 'tmp/ksql.commands';
or do it manually
ksql>CREATE STREAM s_accounts1 WITH (KAFKA_TOPIC='mysql-08-accounts', VALUE_FORMAT='AVRO', KEY='id');
ksql>CREATE TABLE  t_users (registertime BIGINT,  userid VARCHAR, gender VARCHAR, regionid VARCHAR)  WITH (KAFKA_TOPIC = 'mysql-08-accounts',  VALUE_FORMAT='JSON', KEY = 'userid');
ksql>CREATE STREAM s_accounts2 (id INTEGER, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, USERNAME VARCHAR, company VARCHAR, CREATED_DATE INTEGER, UPDATE_TS BIGINT)  WITH (KAFKA_TOPIC = 'mysql-08-accounts',  VALUE_FORMAT='AVRO', KEY = 'id');
ksql>CREATE TABLE  t_accounts_gb as select last_name, count(*) as count from s_accounts2 group by last_name ;
ksql>SELECT * from t_accounts_gb;
mysql> INSERT INTO demo.accounts (`id`, `first_name`, `last_name`, `username`, `company`, `created_date`) VALUES (40, 'lolik40', 'samuel', 'loliksamuel', 'zim', '2019-03-03');

ksql> SELECT ROWKEY, ID, FIRST_NAME + ' ' + LAST_NAME FROM ACCOUNTS;
verify u see nothing. that's because u have to insert a new account first

ksql>CREATE TABLE t_account_gb2  (last_name string, COUNT bigint) WITH (kafka_topic='mysql-08-accounts', value_format='JSON') ;
ksql>DESCRIBE EXTENDED t_users;
ksql>DROP TABLE  IF EXISTS  t_users;
ksql>SHOW | LIST tables;
ksql>describe extended t_account_gb2; --see the columns & how many massages
ksql>select * from t_account_gb2;
note u do not see anything. it is because no new data is inserted. let's insert in different window...
mysql> INSERT INTO demo.accounts (`id`, `first_name`, `last_name`, `username`, `company`, `created_date`) VALUES (40, 'lolik40', 'samuel', 'loliksamuel', 'zim', '2019-03-03');
Query OK, 1 row affected (0.00 sec)
verify after 1 sec, that u see it in the table accountGroupByTable2

ksql>CREATE TABLE t_account_gb3  (usertimestamp BIGINT, user_id VARCHAR, gender VARCHAR, region_id VARCHAR) KAFKA_TOPIC = 'mysql-08-accounts',KEY = 'user_id');

ksql>print 'ACCOUNTGROUPBY' FROM BEGINNING;
???
ksql>DROP TABLE [IF EXISTS] table_name [DELETE TOPIC];
ksql>DROP STREAM [IF EXISTS] stream_name [DELETE TOPIC];
ksql> PRINT 'mysql-01-accounts' FROM BEGINNING


docker exec -it db_postgres psql kafka-sink connect_user
kafka-sink=# \h
kafka-sink-# \l
                                       List of databases
    Name    |    Owner     | Encoding |  Collate   |   Ctype    |       Access privileges
------------+--------------+----------+------------+------------+-------------------------------
 kafka-sink | connect_user | UTF8     | en_US.utf8 | en_US.utf8 |
 postgres   | connect_user | UTF8     | en_US.utf8 | en_US.utf8 |
 template0  | connect_user | UTF8     | en_US.utf8 | en_US.utf8 | =c/connect_user              +
            |              |          |            |            | connect_user=CTc/connect_user
 template1  | connect_user | UTF8     | en_US.utf8 | en_US.utf8 | =c/connect_user              +
            |              |          |            |            | connect_user=CTc/connect_user
(4 rows)

kafka-sink=# \dt *.*
kafka-sink=# \d __table__
kafka-sink=# SELECT current_date;
kafka-sink=# SELECT * from accounts;
verify all accounts are here
mysql>insert into....
kafka-sink=# SELECT * from accounts;
verify added an account
kafka-sink=# \q

other DB’s connectors

  • Postgres

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
            "name": "jdbc_source_postgres_01",
            "config": {
                    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                    "connection.user": "connect_user",
                    "connection.password": "asgard",
                    "topic.prefix": "postgres-01-",
                    "mode":"bulk",
                    "poll.interval.ms" : 3600000,
                    "query" :"select * from accounts"
                    }
            }'
  • Oracle

    cp ojdbc8.jar
    docker cp /db-leach/jdbc/lib/ojdbc8.jar kafka-connect-jdbc-mysql_kafka-connect_1:/usr/share/java/kafka-connect-jdbc
    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
                    "name": "jdbc_source_oracle_01",
                    "config": {
                            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                            "connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
                            "connection.user": "connect_user",
                            "connection.password": "asgard",
                            "topic.prefix": "oracle-01-",
                            "table.whitelist" : "NUM_TEST",
                            "mode":"bulk",
                            "poll.interval.ms" : 3600000
                            }
                    }'
  • MS SQL Server

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
                    "name": "jdbc_source_mssql_01",
                    "config": {
                            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                            "connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
                            "connection.user": "connect_user",
                            "connection.password": "Asgard123",
                            "topic.prefix": "mssql-01-",
                            "table.whitelist" : "demo..num_test",
                            "mode":"bulk",
                            "poll.interval.ms" : 3600000
                            }
                    }'

teste-kafka-coyote's People

Watchers

James Cloos avatar DEOVAN ZANOL avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.