Kafka Stream API: MySQL CDC to apache Kafka with debezium

I was learning kafka. Then started learning kafka stream and was learning how to capture database change-event  and source them to kafka topic. Here we are going to learn capturing mysql database change event and source them to kafka topic and consume them from kafka-console-consumer.

What you need to start with[Note: I have done all this for my ubuntu-18.04 machine]:

  1. apache Kafka + Zookeepr[kafka tar file:v2.12-2.4.0]
  2. Debezium 0.9.2 Debezium_v0.9.2
  3. MySQL 5.7 installed on your pc or any mysql db with mentioned version

Now, are all set to have a headstart. Steps we will follow:

  1. unzip/untar all downloaded resources
  2. Configure MySQL
  3. Configure Apache Kafka
  4. Connector setup
  5. test how data-streaming happens

Step 1

  •  untar your your Kafka tar file in suitable location-like in a dedicated folder in any drive with this command:
    tar xvzf your filename.tar.gz

  • inside your kafka untar file, untar the debezium tar file you downloaded from above link. [You can untar in separate path too!]
  • Your folder structure will look like this:

Your debezium folder must have this contents:


Step 2:

Now, we have to enable binary log to catch the database history change event. To do that, 
 find your MySQL's my.cnf file [in my case it was in /etc/mysql/my.cnf path]. Open it with root's permission. Add this lines


server-id         = 123

log_bin           = mysql-bin

binlog_format     = row

binlog_row_image  = full

expire_logs_days  = 10

set server-id any unique value. Save it and exit.
Restart MySQL server: 

sudo /etc/init.d/mysql restart

or by this: 

sudo service mysql restart 

Now open terminal and check server-id consistency if it is ok by this command below and find server-id if matches with this server-id:

mysqld --verbose --help

Now check if your binay-log option is enabled:

mysqladmin variables -uroot -p|grep log_bin

Remove -p if your database has no password for root user.
Output should be like this:

Login to MySQL server and create a dedicated user for connector we are going to create:

mysql -u root -p 
remove -p option if you don't have your root password.
Then create debezium user with password dbz :


MySQL config is ok now.

Step 3:

 cd to your kafka directory and follow below steps sequentially:

  • start zookeeper:
    bin/zookeeper-server-start.sh config/zookeeper.properties

  • start kafka default broker:
    bin/kafka-server-start.sh config/server.properties

    and wait for a while to fully be ready the startup
  • create a topic named test :

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

  • Inside  kafka-dir/config path, open connect-standalone.properties file and add this:
  • This is not directly the debezium folder but its containing folder, so be careful. In my case, I put debezium folder inside kafka folder, so my path was:
  • plugin.path=/media/t***t/projects/playground/kafka/kafka_2.12-2.4.0
    save file and check if
    are set true.
  •  Change topic name to "test" in connect-console-source.properties and connect-console-source.properties files[Not mandatory].
    Now start connector:
    bin/connect-standalone.sh config/connect-standalone.properties config/connect-console-source.properties config/connect-console-sink.properties
Step 4:
 Kafka Connect Setup:[last but very important]:
  copy and make curl request with this:

curl -i -X POST -H "Accept:application/json" \

    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \

    -d '{

      "name": "mysql-connector",

      "config": {

            "connector.class": "io.debezium.connector.mysql.MySqlConnector",

            "database.hostname": "localhost",

            "database.port": "3306",

            "database.user": "debezium",

            "database.password": "dbz",

            "database.server.id": "42",

            "database.server.name": "test",

            "database.history.kafka.bootstrap.servers": "localhost:9092",

            "database.history.kafka.topic": "dbhistory.test" ,

            "include.schema.changes": "true"



All fields mentioned here are self-explanatory. Make sure database.server.id matches with the server-id you set in your my.cnf file.
To check if your connector setup is ok, run this curl request to check its status:

curl -s "http://localhost:8083/connectors" | jq '.[]' | xargs -I mysql-connector9 curl -s "http://localhost:8083/connectors/mysql-connector9/status" | jq -c -M '[.name,.connector.state,.tasks[].state] |
join(":|:")' | column -s : -t | tr -d \" | sort

If everything is OK, you will see connector's state and task-state with status of RUNNING for both of them.
I assume, your connector setup is okay, now check the topic list, you will see  topic name with your database tables' name with prefix of topic name you created first+ database+table name i.e. we created topic as test and my few tables are t1, t2, t3 for db dbName, so topic will be:
test.dbName.t1, test.dbName.t2, etc...
List all topic:

kafka-topics --zookeeper localhost:2181 --list

Now to test streaming, pick any topic named by  your  table and run cosumer console client:

bin/kafka-console-consumer.sh --bootstrap-server "localhost:9092" --topic "test.tigerhrm.tigerhrm_organization" --from-beginning

Now, update or insert into that table, you will get response like this[check after  key for your changes]:

         "name_en":"test org",
         "address":"my demo add",

Now, you can do whatever you want to do with your data, parse/modify sink to another db etc.
Hope this will help!


Popular posts from this blog

Java with MINIO file operations: upload, download, delete

Spring Boot Scheduler for Distributed System: Using shedlock