This Docker Compose can be used to spin up an environment in which to explore and test: 1. Kafka Connect JDBC source connector : mysql. 2. Kafka Connect JDBC sink connector : postgresql. 3. ksql
Current environment
Confluent Platform |
5.1.0 |
MySQL |
8.0.13 |
Postgres |
11.1 |
MS SQL Server |
2017 (RTM-CU13) (KB4466404) - 14.0.3048.4 (X64) |
Oracle |
12.2.0.1.0 (from github) |
START
run ./scripts/start.sh
MySQL
MYSQL_ROOT_PASSWORD=Admin123 & echo $MYSQL_ROOT_PASSWORD
docker exec -it db_mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD'
or
docker-compose exec mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD'
mysql>ALTER USER 'root' IDENTIFIED WITH mysql_native_password BY '123';
mysql> show databases;
mysql> use demo;
mysql>select id,first_name,last_name, username, company,created_date,update_ts from accounts;
expect to see 10 accounts
mysql>exit
KAFKA KSQL
docker exec -it ksql-cli ksql http://ksql-server:8088
or
docker-compose exec ksql-cli ksql http://ksql-server:8088
guidelines for a basic KSQL server:4 cores,32 GB RAM, 100 GB SSD, 1 Gbit network)
KAFKA CONNECT
docker exec -it connect bash
or
docker-compose exec kafka-connect bash
cd /usr/share/java/kafka-connect-jdbc/jars
ls |grep ojdbc8.jar
POSTGRES
docker exec -it db_postgres psql kafka-sink connect_user
or
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
kafka-sink=# SELECT * from accounts;
expect to see 10 accounts that moved here from mysql through kafka
STOP
run ./scripts/stop.sh
SCHEMA-REGISTRY
docker logs schema-registry
KAFKA-BROKER
docker exec -it kafka-broker kafka-topics --delete --zookeeper zookeeper:2181 --topic sampleTopic
or
docker exec -it kafka-broker bash
>ZK=zookeeper:2181
kafka-producer-perf-test --num-records 10100100 --throughput 10100100 --record-size 100 \
--producer-props acks=1 bootstrap.servers=0.0.0.0:9092 buffer.memory=67108864 compression.type=none batch.size=8196 --topic benchmark1
10,100,100 records sent | 148,000 records/sec | 14.1 MB/sec | 0.433 sec avg latency | 0.579 sec max latency....
kafka-consumer-perf-test --broker-list 0.0.0.0:9092 --messages 10100100 --threads 1 --topic benchmark1
from=2019-05-01 14:00:00:000
to = 2019-05-01 14:00:09:001
10,100,100 Msgs consumed
1,146,593 Msg/sec consumed
963.3 MB consumed
109.3 MB/sec consumed
in console #1
kafka-console-producer --broker-list 0.0.0.0:9092 --topic t1
in console #2
kafka-console-consumer --bootstrap-server 0.0.0.0:9092 --topic t1
or in concole #1
kafka-console-producer --broker-list 0.0.0.0:9092 --topic t_compact --property "parse.key=true" --property "key.separator=:"
key1:value1
key2:value2
key3:value3
key3:value3
in console #2
kafka-console-consumer --bootstrap-server 0.0.0.0:9092 --topic t_compact --property print.key=true --property key.separator="-" --from-beginning
notes:
1. compact mode could not be set up!
2. id is in bytes
3. create table ,streams did not work
kafka-topics --zookeeper $ZK --topic t1 --delete
kafka-topics --zookeeper $ZK --topic t1 --create --partitions 3 --replication-factor 1 --config max.message.bytes=64000
kafka-topics --zookeeper $ZK --topic t_compact --create --partitions 3 --replication-factor 1 --config min.insync.replicas=1 --config cleanup.policy=compact --config segment.bytes=1048576
kafka-topics --zookeeper $ZK --list
kafka-configs --zookeeper $ZK --entity-type topics --entity-name t1 --alter --add-config \ 'retention.ms=1000,max.message.bytes=2048000,cleanup.policy=delete,min.compaction.lag.ms=1000'
kafka-configs --zookeeper $ZK --entity-type topics --describe
kafka-configs --zookeeper $ZK --entity-type brokers --entity-name 0 --alter --add-config log.cleanup.policy=compact
kafka-configs --zookeeper $ZK --entity-type brokers --describe
kafka-topics --zookeeper $ZK --topic t1 --describe
kafka-run-class kafka.tools.GetOffsetShell --broker-list 0.0.0.0:9092 --topic t1 --partitions 0,1,2
kafka-log-dirs --bootstrap-server 0.0.0.0:9092 --describe --topic-list t_compact
docker logs kafka|grep "INFO Kafka version"
ORACLE
docker-compose exec db_oracle bash -c 'sqlplus sys/$ORACLE_PWD@localhost:1521/ORCLCDB as sysdba'
MS SQL SERVER
docker-compose exec db_mssql bash -c '/opt/mssql-tools/bin/sqlcmd -l 30 -S localhost -U sa -P $SA_PASSWORD'
-
MySQL-add connector with a post
-
curl http://localhost:8083/connectors – returns a list with all connectors in use
-
curl http://localhost:8083/connectors/{name} – returns details about a specific connector
-
curl http://localhost:8083/connectors/{name}/status – returns running status of specific connector
-
curl http://localhost:8083/connectors/{name} -X DELETE – delete specific connector
-
curl http://localhost:8083/connectors -X POST – creates a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
-
curl http://localhost:8083/connectors -X POST -H "Content-Type: application/json" -d "@data_oracle08.json"
-
curl -s "http://localhost:8081/subjects"|jq - schema registry
-
in order to execute connector u can run 3 scripts or manually run ./scripts/connectors/submit.sh or create 3 connectors manually
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_08",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "mysql-08-",
"mode":"bulk",
"batch.max.rows":100,
"table.whitelist" : "demo.accounts",
"poll.interval.ms" : 360000
}
}'
response:
{"name":"jdbc_source_mysql_01","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","connection.url":"jdbc:mysql://mysql:3306/demo","connection.user":"connect_user","connection.password":"asgard","topic.prefix":"mysql-01-","mode":"bulk","poll.interval.ms":"10000","name":"jdbc_source_mysql_01"},"tasks":[],"type":null}
curl -s -X GET http://localhost:8083/connectors/|jq
curl -s -X GET http://localhost:8083/connectors/jdbc_source_mysql_01|jq
curl -s -X GET "http://localhost:8083/connectors/jdbc_source_mysql_08/status"|jq
ksql> PRINT 'mysql-01-accounts' FROM BEGINNING;
u will see the content of db in the topics
insert to db
mysql>INSERT INTO demo.accounts
(`id`,
`first_name`,
`last_name`,
`username`,
`company`,
`created_date`) VALUES
(30,
'lolik10',
'samuel10',
'loliksamuel',
'zim',
'2019-03-03');
after 10 sec, u will see it in the topic
create another connector in mode : timestamp
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_ts",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "mysql-08-",
"mode":"timestamp",
"table.whitelist" : "demo.accounts",
"timestamp.column.name": "UPDATE_TS",
"validate.non.null": false
}
}'
mysql> INSERT INTO demo.accounts (`id`, `first_name`, `last_name`, `username`, `company`, `created_date`) VALUES (30, 'lolik10', 'samuel10', 'loliksamuel', 'zim', '2019-03-03');
Query OK, 1 row affected (0.00 sec)
verify after 1 sec, that u see it in the topic
mysql>update demo.accounts set first_name = 'lolik311' where id=31;
verify after 1 sec, that u see it in the topic
mysql>delete from demo.accounts where id=31;
verify that jdbc connector does not support delete oparations. if u need it than consider use cdc transaction-log connector.
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "src_mysql_12b",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "mysql-12b-",
"numeric.mapping": "best_fit",
"table.whitelist" : "demo.transactions",
"mode":"incrementing","incrementing.column.name": "txn_id",
"poll.interval.ms" : 3600000,
"validate.non.null": false
}
}'
ksql> PRINT 'mysql-01-transactions' FROM BEGINNING;
verify a problem :decimal amount field is not serialized correctly. we will have to fix the connector:add "numeric.mapping": "best_fit"
4/21/19 3:11:04 PM UTC, null, {"txn_id": 1000, "customer_id": 5, "amount": {"bytes": "\nÿ"}, "currency": "IRR", "txn_timestamp": "2018-01-12T14:53:49Z"}
curl localhost:8081/subjects/mysql-12a-transactions-value/versions/1 |jq '.schema|fromjson.fields[] | select (.name == "amount")'
mysql>INSERT INTO demo.transactions VALUES (1001, 1, 1.11, 'RUB', now());
Use the CREATE STREAM statement to create a stream from a Kafka topic. Use the CREATE STREAM AS SELECT statement to create a query stream from an existing stream. KSQL can’t infer the topic’s data format, so you must provide the format of the values that are stored in the topic create a connector(using bulk is not a best practice, Valid Values: [, bulk, timestamp, incrementing, timestamp+incrementing])
ksql> SET 'auto.offset.reset'='earliest'; ksql>>SHOW |LIST functions;LIST properties;LIST streams;LIST topics;LIST tables;LIST queries; ksql>run script 'tmp/ksql.commands'; or do it manually ksql>CREATE STREAM s_accounts1 WITH (KAFKA_TOPIC='mysql-08-accounts', VALUE_FORMAT='AVRO', KEY='id'); ksql>CREATE TABLE t_users (registertime BIGINT, userid VARCHAR, gender VARCHAR, regionid VARCHAR) WITH (KAFKA_TOPIC = 'mysql-08-accounts', VALUE_FORMAT='JSON', KEY = 'userid'); ksql>CREATE STREAM s_accounts2 (id INTEGER, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, USERNAME VARCHAR, company VARCHAR, CREATED_DATE INTEGER, UPDATE_TS BIGINT) WITH (KAFKA_TOPIC = 'mysql-08-accounts', VALUE_FORMAT='AVRO', KEY = 'id'); ksql>CREATE TABLE t_accounts_gb as select last_name, count(*) as count from s_accounts2 group by last_name ; ksql>SELECT * from t_accounts_gb; mysql> INSERT INTO demo.accounts (`id`, `first_name`, `last_name`, `username`, `company`, `created_date`) VALUES (40, 'lolik40', 'samuel', 'loliksamuel', 'zim', '2019-03-03'); ksql> SELECT ROWKEY, ID, FIRST_NAME + ' ' + LAST_NAME FROM ACCOUNTS; verify u see nothing. that's because u have to insert a new account first ksql>CREATE TABLE t_account_gb2 (last_name string, COUNT bigint) WITH (kafka_topic='mysql-08-accounts', value_format='JSON') ; ksql>DESCRIBE EXTENDED t_users; ksql>DROP TABLE IF EXISTS t_users; ksql>SHOW | LIST tables; ksql>describe extended t_account_gb2; --see the columns & how many massages ksql>select * from t_account_gb2; note u do not see anything. it is because no new data is inserted. let's insert in different window... mysql> INSERT INTO demo.accounts (`id`, `first_name`, `last_name`, `username`, `company`, `created_date`) VALUES (40, 'lolik40', 'samuel', 'loliksamuel', 'zim', '2019-03-03'); Query OK, 1 row affected (0.00 sec) verify after 1 sec, that u see it in the table accountGroupByTable2 ksql>CREATE TABLE t_account_gb3 (usertimestamp BIGINT, user_id VARCHAR, gender VARCHAR, region_id VARCHAR) KAFKA_TOPIC = 'mysql-08-accounts',KEY = 'user_id'); ksql>print 'ACCOUNTGROUPBY' FROM BEGINNING; ??? ksql>DROP TABLE [IF EXISTS] table_name [DELETE TOPIC]; ksql>DROP STREAM [IF EXISTS] stream_name [DELETE TOPIC]; ksql> PRINT 'mysql-01-accounts' FROM BEGINNING docker exec -it db_postgres psql kafka-sink connect_user kafka-sink=# \h kafka-sink-# \l List of databases Name | Owner | Encoding | Collate | Ctype | Access privileges ------------+--------------+----------+------------+------------+------------------------------- kafka-sink | connect_user | UTF8 | en_US.utf8 | en_US.utf8 | postgres | connect_user | UTF8 | en_US.utf8 | en_US.utf8 | template0 | connect_user | UTF8 | en_US.utf8 | en_US.utf8 | =c/connect_user + | | | | | connect_user=CTc/connect_user template1 | connect_user | UTF8 | en_US.utf8 | en_US.utf8 | =c/connect_user + | | | | | connect_user=CTc/connect_user (4 rows) kafka-sink=# \dt *.* kafka-sink=# \d __table__ kafka-sink=# SELECT current_date; kafka-sink=# SELECT * from accounts; verify all accounts are here mysql>insert into.... kafka-sink=# SELECT * from accounts; verify added an account kafka-sink=# \q
-
Postgres
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_postgres_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:postgresql://postgres:5432/postgres", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "postgres-01-", "mode":"bulk", "poll.interval.ms" : 3600000, "query" :"select * from accounts" } }'
-
Oracle
cp ojdbc8.jar docker cp /db-leach/jdbc/lib/ojdbc8.jar kafka-connect-jdbc-mysql_kafka-connect_1:/usr/share/java/kafka-connect-jdbc curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_oracle_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "oracle-01-", "table.whitelist" : "NUM_TEST", "mode":"bulk", "poll.interval.ms" : 3600000 } }'
-
MS SQL Server
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mssql_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo", "connection.user": "connect_user", "connection.password": "Asgard123", "topic.prefix": "mssql-01-", "table.whitelist" : "demo..num_test", "mode":"bulk", "poll.interval.ms" : 3600000 } }'