Kafka Stream API: MySQL CDC to apache Kafka with debezium

I was learning kafka. Then started learning kafka stream and was learning how to capture database change-event  and source them to kafka topic. Here we are going to learn capturing mysql database change event and source them to kafka topic and consume them from kafka-console-consumer.



What you need to start with[Note: I have done all this for my ubuntu-18.04 machine]:



  1. apache Kafka + Zookeepr[kafka tar file:v2.12-2.4.0]
  2. Debezium 0.9.2 Debezium_v0.9.2
  3. MySQL 5.7 installed on your pc or any mysql db with mentioned version

Now, are all set to have a headstart. Steps we will follow:

  1. unzip/untar all downloaded resources
  2. Configure MySQL
  3. Configure Apache Kafka
  4. Connector setup
  5. test how data-streaming happens

Step 1

  •  untar your your Kafka tar file in suitable location-like in a dedicated folder in any drive with this command:
    tar xvzf your filename.tar.gz
    

  • inside your kafka untar file, untar the debezium tar file you downloaded from above link. [You can untar in separate path too!]
  • Your folder structure will look like this:

            
Your debezium folder must have this contents:


antlr4-runtime-4.7.jar 
 CONTRIBUTE.md 
 debezium-connector-mysql-0.9.2.Final.jar 
 debezium-ddl-parser-0.9.2.Final.jar
  mysql-binlog-connector-java-0.19.0.jar
  protobuf-java-2.6.1.jar
CHANGELOG.md 
COPYRIGHT.txt 
debezium-core-0.9.2.Final.jar 
LICENSE.txt 
mysql-connector-java-8.0.13.jar 
README.md

Step 2:

Now, we have to enable binary log to catch the database history change event. To do that, 
 find your MySQL's my.cnf file [in my case it was in /etc/mysql/my.cnf path]. Open it with root's permission. Add this lines

[mysqld]

server-id         = 123

log_bin           = mysql-bin

binlog_format     = row

binlog_row_image  = full

expire_logs_days  = 10


set server-id any unique value. Save it and exit.
Restart MySQL server: 


sudo /etc/init.d/mysql restart

or by this: 

sudo service mysql restart 


Now open terminal and check server-id consistency if it is ok by this command below and find server-id if matches with this server-id:


mysqld --verbose --help


Now check if your binay-log option is enabled:

mysqladmin variables -uroot -p|grep log_bin

Remove -p if your database has no password for root user.
Output should be like this:



Login to MySQL server and create a dedicated user for connector we are going to create:

mysql -u root -p 
remove -p option if you don't have your root password.
Then create debezium user with password dbz :

mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

MySQL config is ok now.

Step 3:

 cd to your kafka directory and follow below steps sequentially:

  • start zookeeper:
    bin/zookeeper-server-start.sh config/zookeeper.properties
    


  • start kafka default broker:
    bin/kafka-server-start.sh config/server.properties
    

    and wait for a while to fully be ready the startup
  • create a topic named test :

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    

  • Inside  kafka-dir/config path, open connect-standalone.properties file and add this:
    plugin.path=/your_debezium_folder_container_folder_path
    
  • This is not directly the debezium folder but its containing folder, so be careful. In my case, I put debezium folder inside kafka folder, so my path was:
  • plugin.path=/media/t***t/projects/playground/kafka/kafka_2.12-2.4.0
    
    save file and check if
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    
    are set true.
  •  Change topic name to "test" in connect-console-source.properties and connect-console-source.properties files[Not mandatory].
    Now start connector:
    bin/connect-standalone.sh config/connect-standalone.properties config/connect-console-source.properties config/connect-console-sink.properties
    
Step 4:
 Kafka Connect Setup:[last but very important]:
  copy and make curl request with this:


curl -i -X POST -H "Accept:application/json" \

    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \

    -d '{

      "name": "mysql-connector",

      "config": {

            "connector.class": "io.debezium.connector.mysql.MySqlConnector",

            "database.hostname": "localhost",

            "database.port": "3306",

            "database.user": "debezium",

            "database.password": "dbz",

            "database.server.id": "42",

            "database.server.name": "test",

            "database.history.kafka.bootstrap.servers": "localhost:9092",

            "database.history.kafka.topic": "dbhistory.test" ,

            "include.schema.changes": "true"

       }

    }'


All fields mentioned here are self-explanatory. Make sure database.server.id matches with the server-id you set in your my.cnf file.
To check if your connector setup is ok, run this curl request to check its status:


curl -s "http://localhost:8083/connectors" | jq '.[]' | xargs -I mysql-connector9 curl -s "http://localhost:8083/connectors/mysql-connector9/status" | jq -c -M '[.name,.connector.state,.tasks[].state] |
join(":|:")' | column -s : -t | tr -d \" | sort

If everything is OK, you will see connector's state and task-state with status of RUNNING for both of them.
I assume, your connector setup is okay, now check the topic list, you will see  topic name with your database tables' name with prefix of topic name you created first+ database+table name i.e. we created topic as test and my few tables are t1, t2, t3 for db dbName, so topic will be:
test.dbName.t1, test.dbName.t2, etc...
List all topic:


kafka-topics --zookeeper localhost:2181 --list

Now to test streaming, pick any topic named by  your  table and run cosumer console client:


bin/kafka-console-consumer.sh --bootstrap-server "localhost:9092" --topic "test.tigerhrm.tigerhrm_organization" --from-beginning

Now, update or insert into that table, you will get response like this[check after  key for your changes]:


{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"struct",
            "fields":[
               {
                  "type":"int64",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"code"
               },
               {
                  "type":"int32",
                  "optional":true,
                  "default":1,
                  "field":"object_version_id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"type"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"name_en"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"name_bn"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"address"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"email"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"contact"
               },
               {
                  "type":"int16",
                  "optional":true,
                  "field":"status"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"short_name"
               }
            ],
            "optional":true,
            "name":"test.tigerhrm.tigerhrm_organization.Value",
            "field":"before"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"int64",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"code"
               },
               {
                  "type":"int32",
                  "optional":true,
                  "default":1,
                  "field":"object_version_id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"type"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"name_en"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"name_bn"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"address"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"email"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"contact"
               },
               {
                  "type":"int16",
                  "optional":true,
                  "field":"status"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"short_name"
               }
            ],
            "optional":true,
            "name":"test.tigerhrm.tigerhrm_organization.Value",
            "field":"after"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":true,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"server_id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_sec"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"gtid"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"file"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"pos"
               },
               {
                  "type":"int32",
                  "optional":false,
                  "field":"row"
               },
               {
                  "type":"boolean",
                  "optional":true,
                  "default":false,
                  "field":"snapshot"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"thread"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"table"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"query"
               }
            ],
            "optional":false,
            "name":"io.debezium.connector.mysql.Source",
            "field":"source"
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         }
      ],
      "optional":false,
      "name":"test.tigerhrm.tigerhrm_organization.Envelope"
   },
   "payload":{
      "before":{
         "id":5,
         "code":"500",
         "object_version_id":0,
         "type":"ORG_NAME",
         "name_en":"werwer",
         "name_bn":null,
         "address":"fdgd",
         "email":"a@b.com",
         "contact":null,
         "status":1,
         "short_name":"rewerwerwer"
      },
      "after":{
         "id":5,
         "code":"500",
         "object_version_id":0,
         "type":"ORG_NAME",
         "name_en":"test org",
         "name_bn":null,
         "address":"my demo add",
         "email":"a@b.com",
         "contact":null,
         "status":1,
         "short_name":"rewerwerwer"
      },
      "source":{
         "version":"0.9.2.Final",
         "connector":"mysql",
         "name":"test",
         "server_id":123,
         "ts_sec":1581402039,
         "gtid":null,
         "file":"mysql-bin.000011",
         "pos":5106,
         "row":0,
         "snapshot":false,
         "thread":65,
         "db":"tigerhrm",
         "table":"tigerhrm_organization",
         "query":null
      },
      "op":"u",
      "ts_ms":1581402039640
   }
}
{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":true,
                  "field":"code"
               },
               {
                  "type":"bytes",
                  "optional":true,
                  "field":"authentication"
               }
            ],
            "optional":true,
            "name":"test.tigerhrm.oauth_code.Value",
            "field":"before"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":true,
                  "field":"code"
               },
               {
                  "type":"bytes",
                  "optional":true,
                  "field":"authentication"
               }
            ],
            "optional":true,
            "name":"test.tigerhrm.oauth_code.Value",
            "field":"after"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":true,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"server_id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_sec"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"gtid"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"file"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"pos"
               },
               {
                  "type":"int32",
                  "optional":false,
                  "field":"row"
               },
               {
                  "type":"boolean",
                  "optional":true,
                  "default":false,
                  "field":"snapshot"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"thread"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"table"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"query"
               }
            ],
            "optional":false,
            "name":"io.debezium.connector.mysql.Source",
            "field":"source"
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         }
      ],
      "optional":false,
      "name":"test.tigerhrm.oauth_code.Envelope"
   },
   "payload":{
      "before":null,
      "after":{
         "code":"22",
         "authentication":"MTIyMzMz"
      },
      "source":{
         "version":"0.9.2.Final",
         "connector":"mysql",
         "name":"test",
         "server_id":123,
         "ts_sec":1581401679,
         "gtid":null,
         "file":"mysql-bin.000011",
         "pos":4792,
         "row":0,
         "snapshot":false,
         "thread":65,
         "db":"tigerhrm",
         "table":"oauth_code",
         "query":null
      },
      "op":"c",
      "ts_ms":1581401679228
   }
}


Now, you can do whatever you want to do with your data, parse/modify sink to another db etc.
Hope this will help!

Comments

Popular posts from this blog

Java with MINIO file operations: upload, download, delete

Spring Boot Scheduler for Distributed System: Using shedlock