Kafka connect to Elastic Search: Sink kafka Stream data to Elastic search

We have learned how to outsource stream data from MySQL change log to Kafka topic via kafka-source connector from Kafka Stream API: MySQL CDC to apache Kafka with debezium . Now we will learn how to ingest this data to Elastic search and create index on those documents.

So, our target is: 

Change in MySQL db table record , outsource the record in Kafka topic related to that table, ingest and create index on Elastic search

I am assuming you are already having below mentioned services up and configured from the previous blog mentioned above with their compatible version:

  • zookeeper ---> Running
  • Kafka Broker --> Running
  • Kafka Connect --> Running(We will rerun the same way we did in past but with extra config file, will see below)

Requirements:
  1. Elastic Search : v_7.0.1
  2. Kibana : v_7_0_1
  3.  Kafka-Elastic connector jar: kafka-connect-elasticsearch-5.4.0.jar [donwload updated confluent platform , v5.4.0 , and go to confluent folder-> share-> confluent-hub-components-> confluent-kafka-connect-elasticsearch->lib]

Procedure:

  1. Copy the kafka-connect-elasticsearch-5.4.0.jar file into  kafka folder where all of others lib are kept: kafka-folder-> debezium-connector-mysql [as  we have set this path as plugin.path described in the previous blog]
  2. Configure Elastic search connector: create a property file  elasticsearch-connect.properties file inside kafka/config folder with these contents:
  3. name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=Your_Topic_Name_named_by_Your_table_created i.e testTopic
    topic.index.map=testTopic:testTopic_index
    connection.url=http://localhost:9200
    type.name=kafka-connect
    key.ignore=true
    schema.ignore=true
    
  4. Run Elastic search and Kibana
  5. Run Kafka Connect:
  6. bin/connect-standalone.sh config/connect-standalone.properties config/elasticsearch-connect.properties
    
  7. Create Source Mysql-kafka-source connector as we have seen in previous blog i.e mysql-connector to outsource mysql data to kafka-topic.
  8. Check connector's status if their states are in RUNNING:
  9. curl -s "http://localhost:8083/connectors" | jq '.[]' | xargs -I elasticsearch-sink curl -s "http://localhost:8083/connectors/elasticsearch-sink/status" | jq -c -M '[.name,.connector.state,.tasks[].state] |
    join(":|:")' | column -s : -t | tr -d \" | sort
    
          You should see something like this:


elasticsearch-sink     |  RUNNING  |  RUNNING
locale-console-source  |  RUNNING  |  RUNNING
mysql-connector        |  RUNNING  |  RUNNING


Now  are all set to test the system.


  1. Check our selected topic we have configured in  elasticsearch-connect.properties file
  2. curl -XGET 'localhost:9200/test.tigerhrm.tigerhrm_team_index/_search?pretty'
    
Or, go to Kibana console:
Kibana Console
and make a get request for your index mentioned in your config file:
GET /test.tigerhrm.tigerhrm_team_index/_search?pretty

You will see your changes in output section:


{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "test.tigerhrm.tigerhrm_team_index",
        "_type" : "kafka-connect",
        "_id" : "test.tigerhrm.tigerhrm_team+2+14",
        "_score" : 1.0,
        "_source" : {
          "schema" : {
            "name" : "test.tigerhrm.tigerhrm_team.Envelope",
            "optional" : false,
            "type" : "struct",
            "fields" : [
              {
                "field" : "before",
                "name" : "test.tigerhrm.tigerhrm_team.Value",
                "optional" : true,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "description",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "level",
                    "optional" : true,
                    "type" : "int32"
                  },
                  {
                    "field" : "name",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "leader_id",
                    "optional" : true,
                    "type" : "int64"
                  }
                ]
              },
              {
                "field" : "after",
                "name" : "test.tigerhrm.tigerhrm_team.Value",
                "optional" : true,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "description",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "level",
                    "optional" : true,
                    "type" : "int32"
                  },
                  {
                    "field" : "name",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "leader_id",
                    "optional" : true,
                    "type" : "int64"
                  }
                ]
              },
              {
                "field" : "source",
                "name" : "io.debezium.connector.mysql.Source",
                "optional" : false,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "version",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "connector",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "name",
                    "optional" : false,
                    "type" : "string"
                  },
                  {
                    "field" : "server_id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "ts_sec",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "gtid",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "file",
                    "optional" : false,
                    "type" : "string"
                  },
                  {
                    "field" : "pos",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "row",
                    "optional" : false,
                    "type" : "int32"
                  },
                  {
                    "default" : false,
                    "field" : "snapshot",
                    "optional" : true,
                    "type" : "boolean"
                  },
                  {
                    "field" : "thread",
                    "optional" : true,
                    "type" : "int64"
                  },
                  {
                    "field" : "db",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "table",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "query",
                    "optional" : true,
                    "type" : "string"
                  }
                ]
              },
              {
                "field" : "op",
                "optional" : false,
                "type" : "string"
              },
              {
                "field" : "ts_ms",
                "optional" : true,
                "type" : "int64"
              }
            ]
          },
          "payload" : {
            "op" : "u",
            "before" : {
              "level" : null,
              "name" : "sdfsdf445555",
              "description" : "sdfsd",
              "leader_id" : 199,
              "id" : 43
            },
            "after" : {
              "level" : null,
              "name" : "sdfsdf44555533",
              "description" : "sdfsd",
              "leader_id" : 199,
              "id" : 43
            },
            "source" : {
              "ts_sec" : 1582106694,
              "query" : null,
              "thread" : 111,
              "server_id" : 123,
              "version" : "0.9.2.Final",
              "file" : "mysql-bin.000011",
              "connector" : "mysql",
              "pos" : 11144,
              "name" : "test",
              "gtid" : null,
              "row" : 0,
              "snapshot" : false,
              "db" : "tigerhrm",
              "table" : "tigerhrm_team"
            },
            "ts_ms" : 1582106695098
          }
        }
      },
      {
        "_index" : "test.tigerhrm.tigerhrm_team_index",
        "_type" : "kafka-connect",
        "_id" : "test.tigerhrm.tigerhrm_team+2+15",
        "_score" : 1.0,
        "_source" : {
          "schema" : {
            "name" : "test.tigerhrm.tigerhrm_team.Envelope",
            "optional" : false,
            "type" : "struct",
            "fields" : [
              {
                "field" : "before",
                "name" : "test.tigerhrm.tigerhrm_team.Value",
                "optional" : true,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "description",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "level",
                    "optional" : true,
                    "type" : "int32"
                  },
                  {
                    "field" : "name",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "leader_id",
                    "optional" : true,
                    "type" : "int64"
                  }
                ]
              },
              {
                "field" : "after",
                "name" : "test.tigerhrm.tigerhrm_team.Value",
                "optional" : true,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "description",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "level",
                    "optional" : true,
                    "type" : "int32"
                  },
                  {
                    "field" : "name",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "leader_id",
                    "optional" : true,
                    "type" : "int64"
                  }
                ]
              },
              {
                "field" : "source",
                "name" : "io.debezium.connector.mysql.Source",
                "optional" : false,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "version",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "connector",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "name",
                    "optional" : false,
                    "type" : "string"
                  },
                  {
                    "field" : "server_id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "ts_sec",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "gtid",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "file",
                    "optional" : false,
                    "type" : "string"
                  },
                  {
                    "field" : "pos",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "row",
                    "optional" : false,
                    "type" : "int32"
                  },
                  {
                    "default" : false,
                    "field" : "snapshot",
                    "optional" : true,
                    "type" : "boolean"
                  },
                  {
                    "field" : "thread",
                    "optional" : true,
                    "type" : "int64"
                  },
                  {
                    "field" : "db",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "table",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "query",
                    "optional" : true,
                    "type" : "string"
                  }
                ]
              },
              {
                "field" : "op",
                "optional" : false,
                "type" : "string"
              },
              {
                "field" : "ts_ms",
                "optional" : true,
                "type" : "int64"
              }
            ]
          },
          "payload" : {
            "op" : "u",
            "before" : {
              "level" : null,
              "name" : "sdfsdf44555533",
              "description" : "sdfsd",
              "leader_id" : 199,
              "id" : 43
            },
            "after" : {
              "level" : null,
              "name" : "sdfsdf445",
              "description" : "sdfsd",
              "leader_id" : 199,
              "id" : 43
            },
            "source" : {
              "ts_sec" : 1582106853,
              "query" : null,
              "thread" : 111,
              "server_id" : 123,
              "version" : "0.9.2.Final",
              "file" : "mysql-bin.000011",
              "connector" : "mysql",
              "pos" : 11496,
              "name" : "test",
              "gtid" : null,
              "row" : 0,
              "snapshot" : false,
              "db" : "tigerhrm",
              "table" : "tigerhrm_team"
            },
            "ts_ms" : 1582106853293
          }
        }
      },
      {
        "_index" : "test.tigerhrm.tigerhrm_team_index",
        "_type" : "kafka-connect",
        "_id" : "test.tigerhrm.tigerhrm_team+2+11",
        "_score" : 1.0,
        "_source" : {
          "schema" : {
            "name" : "test.tigerhrm.tigerhrm_team.Envelope",
            "optional" : false,
            "type" : "struct",
            "fields" : [
              {
                "field" : "before",
                "name" : "test.tigerhrm.tigerhrm_team.Value",
                "optional" : true,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "description",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "level",
                    "optional" : true,
                    "type" : "int32"
                  },
                  {
                    "field" : "name",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "leader_id",
                    "optional" : true,
                    "type" : "int64"
                  }
                ]
              },
              {
                "field" : "after",
                "name" : "test.tigerhrm.tigerhrm_team.Value",
                "optional" : true,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "description",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "level",
                    "optional" : true,
                    "type" : "int32"
                  },
                  {
                    "field" : "name",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "leader_id",
                    "optional" : true,
                    "type" : "int64"
                  }
                ]
              },
              {
                "field" : "source",
                "name" : "io.debezium.connector.mysql.Source",
                "optional" : false,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "version",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "connector",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "name",
                    "optional" : false,
                    "type" : "string"
                  },
                  {
                    "field" : "server_id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "ts_sec",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "gtid",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "file",
                    "optional" : false,
                    "type" : "string"
                  },
                  {
                    "field" : "pos",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "row",
                    "optional" : false,
                    "type" : "int32"
                  },
                  {
                    "default" : false,
                    "field" : "snapshot",
                    "optional" : true,
                    "type" : "boolean"
                  },
                  {
                    "field" : "thread",
                    "optional" : true,
                    "type" : "int64"
                  },
                  {
                    "field" : "db",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "table",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "query",
                    "optional" : true,
                    "type" : "string"
                  }
                ]
              },
              {
                "field" : "op",
                "optional" : false,
                "type" : "string"
              },
              {
                "field" : "ts_ms",
                "optional" : true,
                "type" : "int64"
              }
            ]
          },
          "payload" : {
            "op" : "u",
            "before" : {
              "level" : null,
              "name" : "sdfsdf44",
              "description" : "sdfsd",
              "leader_id" : 199,
              "id" : 43
            },
            "after" : {
              "level" : null,
              "name" : "sdfsdf4455",
              "description" : "sdfsd",
              "leader_id" : 199,
              "id" : 43
            },
            "source" : {
              "ts_sec" : 1582096703,
              "query" : null,
              "thread" : 111,
              "server_id" : 123,
              "version" : "0.9.2.Final",
              "file" : "mysql-bin.000011",
              "connector" : "mysql",
              "pos" : 10452,
              "name" : "test",
              "gtid" : null,
              "row" : 0,
              "snapshot" : false,
              "db" : "tigerhrm",
              "table" : "tigerhrm_team"
            },
            "ts_ms" : 1582096703433
          }
        }
      },
      {
        "_index" : "test.tigerhrm.tigerhrm_team_index",
        "_type" : "kafka-connect",
        "_id" : "test.tigerhrm.tigerhrm_team+2+12",
        "_score" : 1.0,
        "_source" : {
          "severity" : "WARN",
          "name" : "Test log 2"
        }
      },
      {
        "_index" : "test.tigerhrm.tigerhrm_team_index",
        "_type" : "kafka-connect",
        "_id" : "test.tigerhrm.tigerhrm_team+2+13",
        "_score" : 1.0,
        "_source" : {
          "schema" : {
            "name" : "test.tigerhrm.tigerhrm_team.Envelope",
            "optional" : false,
            "type" : "struct",
            "fields" : [
              {
                "field" : "before",
                "name" : "test.tigerhrm.tigerhrm_team.Value",
                "optional" : true,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "description",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "level",
                    "optional" : true,
                    "type" : "int32"
                  },
                  {
                    "field" : "name",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "leader_id",
                    "optional" : true,
                    "type" : "int64"
                  }
                ]
              },
              {
                "field" : "after",
                "name" : "test.tigerhrm.tigerhrm_team.Value",
                "optional" : true,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "description",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "level",
                    "optional" : true,
                    "type" : "int32"
                  },
                  {
                    "field" : "name",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "leader_id",
                    "optional" : true,
                    "type" : "int64"
                  }
                ]
              },
              {
                "field" : "source",
                "name" : "io.debezium.connector.mysql.Source",
                "optional" : false,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "version",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "connector",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "name",
                    "optional" : false,
                    "type" : "string"
                  },
                  {
                    "field" : "server_id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "ts_sec",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "gtid",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "file",
                    "optional" : false,
                    "type" : "string"
                  },
                  {
                    "field" : "pos",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "row",
                    "optional" : false,
                    "type" : "int32"
                  },
                  {
                    "default" : false,
                    "field" : "snapshot",
                    "optional" : true,
                    "type" : "boolean"
                  },
                  {
                    "field" : "thread",
                    "optional" : true,
                    "type" : "int64"
                  },
                  {
                    "field" : "db",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "table",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "query",
                    "optional" : true,
                    "type" : "string"
                  }
                ]
              },
              {
                "field" : "op",
                "optional" : false,
                "type" : "string"
              },
              {
                "field" : "ts_ms",
                "optional" : true,
                "type" : "int64"
              }
            ]
          },
          "payload" : {
            "op" : "u",
            "before" : {
              "level" : null,
              "name" : "sdfsdf4455",
              "description" : "sdfsd",
              "leader_id" : 199,
              "id" : 43
            },
            "after" : {
              "level" : null,
              "name" : "sdfsdf445555",
              "description" : "sdfsd",
              "leader_id" : 199,
              "id" : 43
            },
            "source" : {
              "ts_sec" : 1582100758,
              "query" : null,
              "thread" : 111,
              "server_id" : 123,
              "version" : "0.9.2.Final",
              "file" : "mysql-bin.000011",
              "connector" : "mysql",
              "pos" : 10796,
              "name" : "test",
              "gtid" : null,
              "row" : 0,
              "snapshot" : false,
              "db" : "tigerhrm",
              "table" : "tigerhrm_team"
            },
            "ts_ms" : 1582106650495
          }
        }
      },
      {
        "_index" : "test.tigerhrm.tigerhrm_team_index",
        "_type" : "kafka-connect",
        "_id" : "test.tigerhrm.tigerhrm_team+0+8",
        "_score" : 1.0,
        "_source" : {
          "severity" : "INFO",
          "name" : "Test log"
        }
      },
      {
        "_index" : "test.tigerhrm.tigerhrm_team_index",
        "_type" : "kafka-connect",
        "_id" : "test.tigerhrm.tigerhrm_team+2+10",
        "_score" : 1.0,
        "_source" : {
          "schema" : {
            "name" : "test.tigerhrm.tigerhrm_team.Envelope",
            "optional" : false,
            "type" : "struct",
            "fields" : [
              {
                "field" : "before",
                "name" : "test.tigerhrm.tigerhrm_team.Value",
                "optional" : true,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "description",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "level",
                    "optional" : true,
                    "type" : "int32"
                  },
                  {
                    "field" : "name",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "leader_id",
                    "optional" : true,
                    "type" : "int64"
                  }
                ]
              },
              {
                "field" : "after",
                "name" : "test.tigerhrm.tigerhrm_team.Value",
                "optional" : true,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "description",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "level",
                    "optional" : true,
                    "type" : "int32"
                  },
                  {
                    "field" : "name",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "leader_id",
                    "optional" : true,
                    "type" : "int64"
                  }
                ]
              },
              {
                "field" : "source",
                "name" : "io.debezium.connector.mysql.Source",
                "optional" : false,
                "type" : "struct",
                "fields" : [
                  {
                    "field" : "version",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "connector",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "name",
                    "optional" : false,
                    "type" : "string"
                  },
                  {
                    "field" : "server_id",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "ts_sec",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "gtid",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "file",
                    "optional" : false,
                    "type" : "string"
                  },
                  {
                    "field" : "pos",
                    "optional" : false,
                    "type" : "int64"
                  },
                  {
                    "field" : "row",
                    "optional" : false,
                    "type" : "int32"
                  },
                  {
                    "default" : false,
                    "field" : "snapshot",
                    "optional" : true,
                    "type" : "boolean"
                  },
                  {
                    "field" : "thread",
                    "optional" : true,
                    "type" : "int64"
                  },
                  {
                    "field" : "db",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "table",
                    "optional" : true,
                    "type" : "string"
                  },
                  {
                    "field" : "query",
                    "optional" : true,
                    "type" : "string"
                  }
                ]
              },
              {
                "field" : "op",
                "optional" : false,
                "type" : "string"
              },
              {
                "field" : "ts_ms",
                "optional" : true,
                "type" : "int64"
              }
            ]
          },
          "payload" : {
            "op" : "u",
            "before" : {
              "level" : null,
              "name" : "sdfsdf",
              "description" : "sdfsd",
              "leader_id" : 199,
              "id" : 43
            },
            "after" : {
              "level" : null,
              "name" : "sdfsdf44",
              "description" : "sdfsd",
              "leader_id" : 199,
              "id" : 43
            },
            "source" : {
              "ts_sec" : 1582096597,
              "query" : null,
              "thread" : 111,
              "server_id" : 123,
              "version" : "0.9.2.Final",
              "file" : "mysql-bin.000011",
              "connector" : "mysql",
              "pos" : 10112,
              "name" : "test",
              "gtid" : null,
              "row" : 0,
              "snapshot" : false,
              "db" : "tigerhrm",
              "table" : "tigerhrm_team"
            },
            "ts_ms" : 1582096670417
          }
        }
      },
      {
        "_index" : "test.tigerhrm.tigerhrm_team_index",
        "_type" : "kafka-connect",
        "_id" : "test.tigerhrm.tigerhrm_team+1+7",
        "_score" : 1.0,
        "_source" : {
          "severity" : "WARN",
          "name" : "Test log 2"
        }
      },
      {
        "_index" : "test.tigerhrm.tigerhrm_team_index",
        "_type" : "kafka-connect",
        "_id" : "test.tigerhrm.tigerhrm_team+2+16",
        "_score" : 1.0,
        "_source" : {
          "op" : "u",
          "before" : {
            "level" : null,
            "name" : "sdfsdf445",
            "description" : "sdfsd",
            "leader_id" : 199,
            "id" : 43
          },
          "after" : {
            "level" : null,
            "name" : "11223344",
            "description" : "sdfsd",
            "leader_id" : 199,
            "id" : 43
          },
          "source" : {
            "ts_sec" : 1582109554,
            "query" : null,
            "thread" : 111,
            "server_id" : 123,
            "version" : "0.9.2.Final",
            "file" : "mysql-bin.000011",
            "connector" : "mysql",
            "pos" : 11845,
            "name" : "test",
            "gtid" : null,
            "row" : 0,
            "snapshot" : false,
            "db" : "tigerhrm",
            "table" : "tigerhrm_team"
          },
          "ts_ms" : 1582109605979
        }
      },
      {
        "_index" : "test.tigerhrm.tigerhrm_team_index",
        "_type" : "kafka-connect",
        "_id" : "test.tigerhrm.tigerhrm_team+2+17",
        "_score" : 1.0,
        "_source" : {
          "op" : "u",
          "before" : {
            "level" : null,
            "name" : "11223344",
            "description" : "sdfsd",
            "leader_id" : 199,
            "id" : 43
          },
          "after" : {
            "level" : null,
            "name" : "1223334444",
            "description" : "sdfsd",
            "leader_id" : 199,
            "id" : 43
          },
          "source" : {
            "ts_sec" : 1582109632,
            "query" : null,
            "thread" : 111,
            "server_id" : 123,
            "version" : "0.9.2.Final",
            "file" : "mysql-bin.000011",
            "connector" : "mysql",
            "pos" : 12188,
            "name" : "test",
            "gtid" : null,
            "row" : 0,
            "snapshot" : false,
            "db" : "tigerhrm",
            "table" : "tigerhrm_team"
          },
          "ts_ms" : 1582109632844
        }
      }
    ]
  }
}


That's all.




Comments

Popular posts from this blog

Java with MINIO file operations: upload, download, delete

Spring Boot Scheduler for Distributed System: Using shedlock

Kafka Stream API: MySQL CDC to apache Kafka with debezium