Kafka connect to Elastic Search: Sink kafka Stream data to Elastic search
We have learned how to outsource stream data from MySQL change log to Kafka topic via kafka-source connector from Kafka Stream API: MySQL CDC to apache Kafka with debezium . Now we will learn how to ingest this data to Elastic search and create index on those documents.
I am assuming you are already having below mentioned services up and configured from the previous blog mentioned above with their compatible version:
So, our target is:
Change in MySQL db table record , outsource the record in Kafka topic related to that table, ingest and create index on Elastic searchI am assuming you are already having below mentioned services up and configured from the previous blog mentioned above with their compatible version:
- zookeeper ---> Running
- Kafka Broker --> Running
- Kafka Connect --> Running(We will rerun the same way we did in past but with extra config file, will see below)
Requirements:
- Elastic Search : v_7.0.1
- Kibana : v_7_0_1
- Kafka-Elastic connector jar: kafka-connect-elasticsearch-5.4.0.jar [donwload updated confluent platform , v5.4.0 , and go to confluent folder-> share-> confluent-hub-components-> confluent-kafka-connect-elasticsearch->lib]
Procedure:
- Copy the kafka-connect-elasticsearch-5.4.0.jar file into kafka folder where all of others lib are kept: kafka-folder-> debezium-connector-mysql [as we have set this path as plugin.path described in the previous blog]
- Configure Elastic search connector: create a property file elasticsearch-connect.properties file inside kafka/config folder with these contents:
- Run Elastic search and Kibana
- Run Kafka Connect:
- Create Source Mysql-kafka-source connector as we have seen in previous blog i.e mysql-connector to outsource mysql data to kafka-topic.
- Check connector's status if their states are in RUNNING:
name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=Your_Topic_Name_named_by_Your_table_created i.e testTopic topic.index.map=testTopic:testTopic_index connection.url=http://localhost:9200 type.name=kafka-connect key.ignore=true schema.ignore=true
bin/connect-standalone.sh config/connect-standalone.properties config/elasticsearch-connect.properties
curl -s "http://localhost:8083/connectors" | jq '.[]' | xargs -I elasticsearch-sink curl -s "http://localhost:8083/connectors/elasticsearch-sink/status" | jq -c -M '[.name,.connector.state,.tasks[].state] | join(":|:")' | column -s : -t | tr -d \" | sort
You should see something like this:
elasticsearch-sink | RUNNING | RUNNING locale-console-source | RUNNING | RUNNING mysql-connector | RUNNING | RUNNING
Now are all set to test the system.
- Check our selected topic we have configured in elasticsearch-connect.properties file
curl -XGET 'localhost:9200/test.tigerhrm.tigerhrm_team_index/_search?pretty'
Or, go to Kibana console:
Kibana Console
and make a get request for your index mentioned in your config file:
GET /test.tigerhrm.tigerhrm_team_index/_search?pretty
You will see your changes in output section:
and make a get request for your index mentioned in your config file:
GET /test.tigerhrm.tigerhrm_team_index/_search?pretty
You will see your changes in output section:
{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 10, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "test.tigerhrm.tigerhrm_team_index", "_type" : "kafka-connect", "_id" : "test.tigerhrm.tigerhrm_team+2+14", "_score" : 1.0, "_source" : { "schema" : { "name" : "test.tigerhrm.tigerhrm_team.Envelope", "optional" : false, "type" : "struct", "fields" : [ { "field" : "before", "name" : "test.tigerhrm.tigerhrm_team.Value", "optional" : true, "type" : "struct", "fields" : [ { "field" : "id", "optional" : false, "type" : "int64" }, { "field" : "description", "optional" : true, "type" : "string" }, { "field" : "level", "optional" : true, "type" : "int32" }, { "field" : "name", "optional" : true, "type" : "string" }, { "field" : "leader_id", "optional" : true, "type" : "int64" } ] }, { "field" : "after", "name" : "test.tigerhrm.tigerhrm_team.Value", "optional" : true, "type" : "struct", "fields" : [ { "field" : "id", "optional" : false, "type" : "int64" }, { "field" : "description", "optional" : true, "type" : "string" }, { "field" : "level", "optional" : true, "type" : "int32" }, { "field" : "name", "optional" : true, "type" : "string" }, { "field" : "leader_id", "optional" : true, "type" : "int64" } ] }, { "field" : "source", "name" : "io.debezium.connector.mysql.Source", "optional" : false, "type" : "struct", "fields" : [ { "field" : "version", "optional" : true, "type" : "string" }, { "field" : "connector", "optional" : true, "type" : "string" }, { "field" : "name", "optional" : false, "type" : "string" }, { "field" : "server_id", "optional" : false, "type" : "int64" }, { "field" : "ts_sec", "optional" : false, "type" : "int64" }, { "field" : "gtid", "optional" : true, "type" : "string" }, { "field" : "file", "optional" : false, "type" : "string" }, { "field" : "pos", "optional" : false, "type" : "int64" }, { "field" : "row", "optional" : false, "type" : "int32" }, { "default" : false, "field" : "snapshot", "optional" : true, "type" : "boolean" }, { "field" : "thread", "optional" : true, "type" : "int64" }, { "field" : "db", "optional" : true, "type" : "string" }, { "field" : "table", "optional" : true, "type" : "string" }, { "field" : "query", "optional" : true, "type" : "string" } ] }, { "field" : "op", "optional" : false, "type" : "string" }, { "field" : "ts_ms", "optional" : true, "type" : "int64" } ] }, "payload" : { "op" : "u", "before" : { "level" : null, "name" : "sdfsdf445555", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "after" : { "level" : null, "name" : "sdfsdf44555533", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "source" : { "ts_sec" : 1582106694, "query" : null, "thread" : 111, "server_id" : 123, "version" : "0.9.2.Final", "file" : "mysql-bin.000011", "connector" : "mysql", "pos" : 11144, "name" : "test", "gtid" : null, "row" : 0, "snapshot" : false, "db" : "tigerhrm", "table" : "tigerhrm_team" }, "ts_ms" : 1582106695098 } } }, { "_index" : "test.tigerhrm.tigerhrm_team_index", "_type" : "kafka-connect", "_id" : "test.tigerhrm.tigerhrm_team+2+15", "_score" : 1.0, "_source" : { "schema" : { "name" : "test.tigerhrm.tigerhrm_team.Envelope", "optional" : false, "type" : "struct", "fields" : [ { "field" : "before", "name" : "test.tigerhrm.tigerhrm_team.Value", "optional" : true, "type" : "struct", "fields" : [ { "field" : "id", "optional" : false, "type" : "int64" }, { "field" : "description", "optional" : true, "type" : "string" }, { "field" : "level", "optional" : true, "type" : "int32" }, { "field" : "name", "optional" : true, "type" : "string" }, { "field" : "leader_id", "optional" : true, "type" : "int64" } ] }, { "field" : "after", "name" : "test.tigerhrm.tigerhrm_team.Value", "optional" : true, "type" : "struct", "fields" : [ { "field" : "id", "optional" : false, "type" : "int64" }, { "field" : "description", "optional" : true, "type" : "string" }, { "field" : "level", "optional" : true, "type" : "int32" }, { "field" : "name", "optional" : true, "type" : "string" }, { "field" : "leader_id", "optional" : true, "type" : "int64" } ] }, { "field" : "source", "name" : "io.debezium.connector.mysql.Source", "optional" : false, "type" : "struct", "fields" : [ { "field" : "version", "optional" : true, "type" : "string" }, { "field" : "connector", "optional" : true, "type" : "string" }, { "field" : "name", "optional" : false, "type" : "string" }, { "field" : "server_id", "optional" : false, "type" : "int64" }, { "field" : "ts_sec", "optional" : false, "type" : "int64" }, { "field" : "gtid", "optional" : true, "type" : "string" }, { "field" : "file", "optional" : false, "type" : "string" }, { "field" : "pos", "optional" : false, "type" : "int64" }, { "field" : "row", "optional" : false, "type" : "int32" }, { "default" : false, "field" : "snapshot", "optional" : true, "type" : "boolean" }, { "field" : "thread", "optional" : true, "type" : "int64" }, { "field" : "db", "optional" : true, "type" : "string" }, { "field" : "table", "optional" : true, "type" : "string" }, { "field" : "query", "optional" : true, "type" : "string" } ] }, { "field" : "op", "optional" : false, "type" : "string" }, { "field" : "ts_ms", "optional" : true, "type" : "int64" } ] }, "payload" : { "op" : "u", "before" : { "level" : null, "name" : "sdfsdf44555533", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "after" : { "level" : null, "name" : "sdfsdf445", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "source" : { "ts_sec" : 1582106853, "query" : null, "thread" : 111, "server_id" : 123, "version" : "0.9.2.Final", "file" : "mysql-bin.000011", "connector" : "mysql", "pos" : 11496, "name" : "test", "gtid" : null, "row" : 0, "snapshot" : false, "db" : "tigerhrm", "table" : "tigerhrm_team" }, "ts_ms" : 1582106853293 } } }, { "_index" : "test.tigerhrm.tigerhrm_team_index", "_type" : "kafka-connect", "_id" : "test.tigerhrm.tigerhrm_team+2+11", "_score" : 1.0, "_source" : { "schema" : { "name" : "test.tigerhrm.tigerhrm_team.Envelope", "optional" : false, "type" : "struct", "fields" : [ { "field" : "before", "name" : "test.tigerhrm.tigerhrm_team.Value", "optional" : true, "type" : "struct", "fields" : [ { "field" : "id", "optional" : false, "type" : "int64" }, { "field" : "description", "optional" : true, "type" : "string" }, { "field" : "level", "optional" : true, "type" : "int32" }, { "field" : "name", "optional" : true, "type" : "string" }, { "field" : "leader_id", "optional" : true, "type" : "int64" } ] }, { "field" : "after", "name" : "test.tigerhrm.tigerhrm_team.Value", "optional" : true, "type" : "struct", "fields" : [ { "field" : "id", "optional" : false, "type" : "int64" }, { "field" : "description", "optional" : true, "type" : "string" }, { "field" : "level", "optional" : true, "type" : "int32" }, { "field" : "name", "optional" : true, "type" : "string" }, { "field" : "leader_id", "optional" : true, "type" : "int64" } ] }, { "field" : "source", "name" : "io.debezium.connector.mysql.Source", "optional" : false, "type" : "struct", "fields" : [ { "field" : "version", "optional" : true, "type" : "string" }, { "field" : "connector", "optional" : true, "type" : "string" }, { "field" : "name", "optional" : false, "type" : "string" }, { "field" : "server_id", "optional" : false, "type" : "int64" }, { "field" : "ts_sec", "optional" : false, "type" : "int64" }, { "field" : "gtid", "optional" : true, "type" : "string" }, { "field" : "file", "optional" : false, "type" : "string" }, { "field" : "pos", "optional" : false, "type" : "int64" }, { "field" : "row", "optional" : false, "type" : "int32" }, { "default" : false, "field" : "snapshot", "optional" : true, "type" : "boolean" }, { "field" : "thread", "optional" : true, "type" : "int64" }, { "field" : "db", "optional" : true, "type" : "string" }, { "field" : "table", "optional" : true, "type" : "string" }, { "field" : "query", "optional" : true, "type" : "string" } ] }, { "field" : "op", "optional" : false, "type" : "string" }, { "field" : "ts_ms", "optional" : true, "type" : "int64" } ] }, "payload" : { "op" : "u", "before" : { "level" : null, "name" : "sdfsdf44", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "after" : { "level" : null, "name" : "sdfsdf4455", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "source" : { "ts_sec" : 1582096703, "query" : null, "thread" : 111, "server_id" : 123, "version" : "0.9.2.Final", "file" : "mysql-bin.000011", "connector" : "mysql", "pos" : 10452, "name" : "test", "gtid" : null, "row" : 0, "snapshot" : false, "db" : "tigerhrm", "table" : "tigerhrm_team" }, "ts_ms" : 1582096703433 } } }, { "_index" : "test.tigerhrm.tigerhrm_team_index", "_type" : "kafka-connect", "_id" : "test.tigerhrm.tigerhrm_team+2+12", "_score" : 1.0, "_source" : { "severity" : "WARN", "name" : "Test log 2" } }, { "_index" : "test.tigerhrm.tigerhrm_team_index", "_type" : "kafka-connect", "_id" : "test.tigerhrm.tigerhrm_team+2+13", "_score" : 1.0, "_source" : { "schema" : { "name" : "test.tigerhrm.tigerhrm_team.Envelope", "optional" : false, "type" : "struct", "fields" : [ { "field" : "before", "name" : "test.tigerhrm.tigerhrm_team.Value", "optional" : true, "type" : "struct", "fields" : [ { "field" : "id", "optional" : false, "type" : "int64" }, { "field" : "description", "optional" : true, "type" : "string" }, { "field" : "level", "optional" : true, "type" : "int32" }, { "field" : "name", "optional" : true, "type" : "string" }, { "field" : "leader_id", "optional" : true, "type" : "int64" } ] }, { "field" : "after", "name" : "test.tigerhrm.tigerhrm_team.Value", "optional" : true, "type" : "struct", "fields" : [ { "field" : "id", "optional" : false, "type" : "int64" }, { "field" : "description", "optional" : true, "type" : "string" }, { "field" : "level", "optional" : true, "type" : "int32" }, { "field" : "name", "optional" : true, "type" : "string" }, { "field" : "leader_id", "optional" : true, "type" : "int64" } ] }, { "field" : "source", "name" : "io.debezium.connector.mysql.Source", "optional" : false, "type" : "struct", "fields" : [ { "field" : "version", "optional" : true, "type" : "string" }, { "field" : "connector", "optional" : true, "type" : "string" }, { "field" : "name", "optional" : false, "type" : "string" }, { "field" : "server_id", "optional" : false, "type" : "int64" }, { "field" : "ts_sec", "optional" : false, "type" : "int64" }, { "field" : "gtid", "optional" : true, "type" : "string" }, { "field" : "file", "optional" : false, "type" : "string" }, { "field" : "pos", "optional" : false, "type" : "int64" }, { "field" : "row", "optional" : false, "type" : "int32" }, { "default" : false, "field" : "snapshot", "optional" : true, "type" : "boolean" }, { "field" : "thread", "optional" : true, "type" : "int64" }, { "field" : "db", "optional" : true, "type" : "string" }, { "field" : "table", "optional" : true, "type" : "string" }, { "field" : "query", "optional" : true, "type" : "string" } ] }, { "field" : "op", "optional" : false, "type" : "string" }, { "field" : "ts_ms", "optional" : true, "type" : "int64" } ] }, "payload" : { "op" : "u", "before" : { "level" : null, "name" : "sdfsdf4455", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "after" : { "level" : null, "name" : "sdfsdf445555", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "source" : { "ts_sec" : 1582100758, "query" : null, "thread" : 111, "server_id" : 123, "version" : "0.9.2.Final", "file" : "mysql-bin.000011", "connector" : "mysql", "pos" : 10796, "name" : "test", "gtid" : null, "row" : 0, "snapshot" : false, "db" : "tigerhrm", "table" : "tigerhrm_team" }, "ts_ms" : 1582106650495 } } }, { "_index" : "test.tigerhrm.tigerhrm_team_index", "_type" : "kafka-connect", "_id" : "test.tigerhrm.tigerhrm_team+0+8", "_score" : 1.0, "_source" : { "severity" : "INFO", "name" : "Test log" } }, { "_index" : "test.tigerhrm.tigerhrm_team_index", "_type" : "kafka-connect", "_id" : "test.tigerhrm.tigerhrm_team+2+10", "_score" : 1.0, "_source" : { "schema" : { "name" : "test.tigerhrm.tigerhrm_team.Envelope", "optional" : false, "type" : "struct", "fields" : [ { "field" : "before", "name" : "test.tigerhrm.tigerhrm_team.Value", "optional" : true, "type" : "struct", "fields" : [ { "field" : "id", "optional" : false, "type" : "int64" }, { "field" : "description", "optional" : true, "type" : "string" }, { "field" : "level", "optional" : true, "type" : "int32" }, { "field" : "name", "optional" : true, "type" : "string" }, { "field" : "leader_id", "optional" : true, "type" : "int64" } ] }, { "field" : "after", "name" : "test.tigerhrm.tigerhrm_team.Value", "optional" : true, "type" : "struct", "fields" : [ { "field" : "id", "optional" : false, "type" : "int64" }, { "field" : "description", "optional" : true, "type" : "string" }, { "field" : "level", "optional" : true, "type" : "int32" }, { "field" : "name", "optional" : true, "type" : "string" }, { "field" : "leader_id", "optional" : true, "type" : "int64" } ] }, { "field" : "source", "name" : "io.debezium.connector.mysql.Source", "optional" : false, "type" : "struct", "fields" : [ { "field" : "version", "optional" : true, "type" : "string" }, { "field" : "connector", "optional" : true, "type" : "string" }, { "field" : "name", "optional" : false, "type" : "string" }, { "field" : "server_id", "optional" : false, "type" : "int64" }, { "field" : "ts_sec", "optional" : false, "type" : "int64" }, { "field" : "gtid", "optional" : true, "type" : "string" }, { "field" : "file", "optional" : false, "type" : "string" }, { "field" : "pos", "optional" : false, "type" : "int64" }, { "field" : "row", "optional" : false, "type" : "int32" }, { "default" : false, "field" : "snapshot", "optional" : true, "type" : "boolean" }, { "field" : "thread", "optional" : true, "type" : "int64" }, { "field" : "db", "optional" : true, "type" : "string" }, { "field" : "table", "optional" : true, "type" : "string" }, { "field" : "query", "optional" : true, "type" : "string" } ] }, { "field" : "op", "optional" : false, "type" : "string" }, { "field" : "ts_ms", "optional" : true, "type" : "int64" } ] }, "payload" : { "op" : "u", "before" : { "level" : null, "name" : "sdfsdf", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "after" : { "level" : null, "name" : "sdfsdf44", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "source" : { "ts_sec" : 1582096597, "query" : null, "thread" : 111, "server_id" : 123, "version" : "0.9.2.Final", "file" : "mysql-bin.000011", "connector" : "mysql", "pos" : 10112, "name" : "test", "gtid" : null, "row" : 0, "snapshot" : false, "db" : "tigerhrm", "table" : "tigerhrm_team" }, "ts_ms" : 1582096670417 } } }, { "_index" : "test.tigerhrm.tigerhrm_team_index", "_type" : "kafka-connect", "_id" : "test.tigerhrm.tigerhrm_team+1+7", "_score" : 1.0, "_source" : { "severity" : "WARN", "name" : "Test log 2" } }, { "_index" : "test.tigerhrm.tigerhrm_team_index", "_type" : "kafka-connect", "_id" : "test.tigerhrm.tigerhrm_team+2+16", "_score" : 1.0, "_source" : { "op" : "u", "before" : { "level" : null, "name" : "sdfsdf445", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "after" : { "level" : null, "name" : "11223344", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "source" : { "ts_sec" : 1582109554, "query" : null, "thread" : 111, "server_id" : 123, "version" : "0.9.2.Final", "file" : "mysql-bin.000011", "connector" : "mysql", "pos" : 11845, "name" : "test", "gtid" : null, "row" : 0, "snapshot" : false, "db" : "tigerhrm", "table" : "tigerhrm_team" }, "ts_ms" : 1582109605979 } }, { "_index" : "test.tigerhrm.tigerhrm_team_index", "_type" : "kafka-connect", "_id" : "test.tigerhrm.tigerhrm_team+2+17", "_score" : 1.0, "_source" : { "op" : "u", "before" : { "level" : null, "name" : "11223344", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "after" : { "level" : null, "name" : "1223334444", "description" : "sdfsd", "leader_id" : 199, "id" : 43 }, "source" : { "ts_sec" : 1582109632, "query" : null, "thread" : 111, "server_id" : 123, "version" : "0.9.2.Final", "file" : "mysql-bin.000011", "connector" : "mysql", "pos" : 12188, "name" : "test", "gtid" : null, "row" : 0, "snapshot" : false, "db" : "tigerhrm", "table" : "tigerhrm_team" }, "ts_ms" : 1582109632844 } } ] } }
That's all.
Comments
Post a Comment