Kafka Stream API: MySQL CDC to apache Kafka with debezium
I was learning kafka. Then started learning kafka stream and was learning how to capture database change-event and source them to kafka topic. Here we are going to learn capturing mysql database change event and source them to kafka topic and consume them from kafka-console-consumer.
What you need to start with[Note: I have done all this for my ubuntu-18.04 machine]:
remove -p option if you don't have your root password.
What you need to start with[Note: I have done all this for my ubuntu-18.04 machine]:
- apache Kafka + Zookeepr[kafka tar file:v2.12-2.4.0]
- Debezium 0.9.2 Debezium_v0.9.2
- MySQL 5.7 installed on your pc or any mysql db with mentioned version
Now, are all set to have a headstart. Steps we will follow:
- unzip/untar all downloaded resources
- Configure MySQL
- Configure Apache Kafka
- Connector setup
- test how data-streaming happens
Step 1.
- untar your your Kafka tar file in suitable location-like in a dedicated folder in any drive with this command:
tar xvzf your filename.tar.gz
- inside your kafka untar file, untar the debezium tar file you downloaded from above link. [You can untar in separate path too!]
- Your folder structure will look like this:
Your debezium folder must have this contents:
antlr4-runtime-4.7.jar
CONTRIBUTE.md
debezium-connector-mysql-0.9.2.Final.jar
debezium-ddl-parser-0.9.2.Final.jar
mysql-binlog-connector-java-0.19.0.jar
protobuf-java-2.6.1.jar
CHANGELOG.md
COPYRIGHT.txt
debezium-core-0.9.2.Final.jar
LICENSE.txt
mysql-connector-java-8.0.13.jar
README.md
Step 2:
Now, we have to enable binary log to catch the database history change event. To do that,
find your MySQL's my.cnf file [in my case it was in /etc/mysql/my.cnf path]. Open it with root's permission. Add this lines
[mysqld] server-id = 123 log_bin = mysql-bin binlog_format = row binlog_row_image = full expire_logs_days = 10
set server-id any unique value. Save it and exit.
Restart MySQL server:
sudo /etc/init.d/mysql restart
or by this:
sudo service mysql restart
Now open terminal and check server-id consistency if it is ok by this command below and find server-id if matches with this server-id:
mysqld --verbose --help
Now check if your binay-log option is enabled:
mysqladmin variables -uroot -p|grep log_bin
Remove -p if your database has no password for root user.
Output should be like this:
Login to MySQL server and create a dedicated user for connector we are going to create:
mysql -u root -p
Then create debezium user with password dbz :
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
MySQL config is ok now.
Step 3:
cd to your kafka directory and follow below steps sequentially:
- start zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
- start kafka default broker:
bin/kafka-server-start.sh config/server.properties
and wait for a while to fully be ready the startup - create a topic named test :
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
- Inside kafka-dir/config path, open connect-standalone.properties file and add this:
plugin.path=/your_debezium_folder_container_folder_path
- This is not directly the debezium folder but its containing folder, so be careful. In my case, I put debezium folder inside kafka folder, so my path was:
- Change topic name to "test" in connect-console-source.properties and connect-console-source.properties files[Not mandatory].
Now start connector:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-console-source.properties config/connect-console-sink.properties
plugin.path=/media/t***t/projects/playground/kafka/kafka_2.12-2.4.0
key.converter.schemas.enable=true value.converter.schemas.enable=true
Step 4:
Kafka Connect Setup:[last but very important]:
Kafka Connect Setup:[last but very important]:
copy and make curl request with this:
curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "localhost", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "42", "database.server.name": "test", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.test" , "include.schema.changes": "true" } }'
All fields mentioned here are self-explanatory. Make sure database.server.id matches with the server-id you set in your my.cnf file.
To check if your connector setup is ok, run this curl request to check its status:
curl -s "http://localhost:8083/connectors" | jq '.[]' | xargs -I mysql-connector9 curl -s "http://localhost:8083/connectors/mysql-connector9/status" | jq -c -M '[.name,.connector.state,.tasks[].state] | join(":|:")' | column -s : -t | tr -d \" | sort
If everything is OK, you will see connector's state and task-state with status of RUNNING for both of them.
I assume, your connector setup is okay, now check the topic list, you will see topic name with your database tables' name with prefix of topic name you created first+ database+table name i.e. we created topic as test and my few tables are t1, t2, t3 for db dbName, so topic will be:
test.dbName.t1, test.dbName.t2, etc...
List all topic:
kafka-topics --zookeeper localhost:2181 --list
Now to test streaming, pick any topic named by your table and run cosumer console client:
bin/kafka-console-consumer.sh --bootstrap-server "localhost:9092" --topic "test.tigerhrm.tigerhrm_organization" --from-beginning
Now, update or insert into that table, you will get response like this[check after key for your changes]:
{ "schema":{ "type":"struct", "fields":[ { "type":"struct", "fields":[ { "type":"int64", "optional":false, "field":"id" }, { "type":"string", "optional":true, "field":"code" }, { "type":"int32", "optional":true, "default":1, "field":"object_version_id" }, { "type":"string", "optional":true, "field":"type" }, { "type":"string", "optional":true, "field":"name_en" }, { "type":"string", "optional":true, "field":"name_bn" }, { "type":"string", "optional":true, "field":"address" }, { "type":"string", "optional":true, "field":"email" }, { "type":"string", "optional":true, "field":"contact" }, { "type":"int16", "optional":true, "field":"status" }, { "type":"string", "optional":true, "field":"short_name" } ], "optional":true, "name":"test.tigerhrm.tigerhrm_organization.Value", "field":"before" }, { "type":"struct", "fields":[ { "type":"int64", "optional":false, "field":"id" }, { "type":"string", "optional":true, "field":"code" }, { "type":"int32", "optional":true, "default":1, "field":"object_version_id" }, { "type":"string", "optional":true, "field":"type" }, { "type":"string", "optional":true, "field":"name_en" }, { "type":"string", "optional":true, "field":"name_bn" }, { "type":"string", "optional":true, "field":"address" }, { "type":"string", "optional":true, "field":"email" }, { "type":"string", "optional":true, "field":"contact" }, { "type":"int16", "optional":true, "field":"status" }, { "type":"string", "optional":true, "field":"short_name" } ], "optional":true, "name":"test.tigerhrm.tigerhrm_organization.Value", "field":"after" }, { "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"version" }, { "type":"string", "optional":true, "field":"connector" }, { "type":"string", "optional":false, "field":"name" }, { "type":"int64", "optional":false, "field":"server_id" }, { "type":"int64", "optional":false, "field":"ts_sec" }, { "type":"string", "optional":true, "field":"gtid" }, { "type":"string", "optional":false, "field":"file" }, { "type":"int64", "optional":false, "field":"pos" }, { "type":"int32", "optional":false, "field":"row" }, { "type":"boolean", "optional":true, "default":false, "field":"snapshot" }, { "type":"int64", "optional":true, "field":"thread" }, { "type":"string", "optional":true, "field":"db" }, { "type":"string", "optional":true, "field":"table" }, { "type":"string", "optional":true, "field":"query" } ], "optional":false, "name":"io.debezium.connector.mysql.Source", "field":"source" }, { "type":"string", "optional":false, "field":"op" }, { "type":"int64", "optional":true, "field":"ts_ms" } ], "optional":false, "name":"test.tigerhrm.tigerhrm_organization.Envelope" }, "payload":{ "before":{ "id":5, "code":"500", "object_version_id":0, "type":"ORG_NAME", "name_en":"werwer", "name_bn":null, "address":"fdgd", "email":"a@b.com", "contact":null, "status":1, "short_name":"rewerwerwer" }, "after":{ "id":5, "code":"500", "object_version_id":0, "type":"ORG_NAME", "name_en":"test org", "name_bn":null, "address":"my demo add", "email":"a@b.com", "contact":null, "status":1, "short_name":"rewerwerwer" }, "source":{ "version":"0.9.2.Final", "connector":"mysql", "name":"test", "server_id":123, "ts_sec":1581402039, "gtid":null, "file":"mysql-bin.000011", "pos":5106, "row":0, "snapshot":false, "thread":65, "db":"tigerhrm", "table":"tigerhrm_organization", "query":null }, "op":"u", "ts_ms":1581402039640 } } { "schema":{ "type":"struct", "fields":[ { "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"code" }, { "type":"bytes", "optional":true, "field":"authentication" } ], "optional":true, "name":"test.tigerhrm.oauth_code.Value", "field":"before" }, { "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"code" }, { "type":"bytes", "optional":true, "field":"authentication" } ], "optional":true, "name":"test.tigerhrm.oauth_code.Value", "field":"after" }, { "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"version" }, { "type":"string", "optional":true, "field":"connector" }, { "type":"string", "optional":false, "field":"name" }, { "type":"int64", "optional":false, "field":"server_id" }, { "type":"int64", "optional":false, "field":"ts_sec" }, { "type":"string", "optional":true, "field":"gtid" }, { "type":"string", "optional":false, "field":"file" }, { "type":"int64", "optional":false, "field":"pos" }, { "type":"int32", "optional":false, "field":"row" }, { "type":"boolean", "optional":true, "default":false, "field":"snapshot" }, { "type":"int64", "optional":true, "field":"thread" }, { "type":"string", "optional":true, "field":"db" }, { "type":"string", "optional":true, "field":"table" }, { "type":"string", "optional":true, "field":"query" } ], "optional":false, "name":"io.debezium.connector.mysql.Source", "field":"source" }, { "type":"string", "optional":false, "field":"op" }, { "type":"int64", "optional":true, "field":"ts_ms" } ], "optional":false, "name":"test.tigerhrm.oauth_code.Envelope" }, "payload":{ "before":null, "after":{ "code":"22", "authentication":"MTIyMzMz" }, "source":{ "version":"0.9.2.Final", "connector":"mysql", "name":"test", "server_id":123, "ts_sec":1581401679, "gtid":null, "file":"mysql-bin.000011", "pos":4792, "row":0, "snapshot":false, "thread":65, "db":"tigerhrm", "table":"oauth_code", "query":null }, "op":"c", "ts_ms":1581401679228 } }
Now, you can do whatever you want to do with your data, parse/modify sink to another db etc.
Hope this will help!
Comments
Post a Comment