Goal:
This article shares the examples of curl commands to submit REST requests to a distributed Kafka Connect cluster.Env:
MapR 6.1 (secured)mapr-kafka-1.1.1
mapr-kafka-connect-jdbc-4.1.0
Prerequisite:
1. This article will use the same use case documented in this article talking about standalone Kafka Connect. Please understand this use case firstly.2. To better format the json object, we need to install a tool "jq" on this CentOS 7 env using below command.
yum install epel-release yum install jq3. Create a MapR Streams named "/tmp/hivemeta".
maprcli stream delete -path /tmp/hivemeta maprcli stream create -path /tmp/hivemeta4. Put the MySQL JDBC driver to kafka-connect JDBC path.
cp /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/mysql-connector-java-5.1.25.jar /opt/mapr/kafka-connect-jdbc/kafka-connect-jdbc-4.1.0/share/java/kafka-connect-jdbc/5. Restart all the Kafka Connect cluster on all nodes.
In this example, this is a 4-nodes Kafka Connect cluster -- v1.poc.com, v2.poc.com, v3.poc.com and v4.poc.com.
maprcli node services -name kafka-connect -action restart -nodes `hostname -f`
Solution:
1. Create a Connector named "mysql-source-dist"
curl -v -X POST https://v1.poc.com:8083/connectors \ --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr \ -H "Content-Type: application/json" \ --data-binary @- << EOF { "name": "mysql-source-dist", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive", "table.whitelist": "TBLS", "mode": "incrementing", "incrementing.column.name": "TBL_ID", "topic.prefix": "/tmp/hivemeta:tbls" } } EOF
2. Check Kafka Connect version
curl https://v1.poc.com:8083 --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr | jq { "version": "1.1.1-mapr-1901", "commit": "8185d9b9630a495d", "kafka_cluster_id": "7038424823489051793" }
3. List connector plugins available on this worker
curl https://v1.poc.com:8083/connector-plugins --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr | jq [ { "class": "io.confluent.connect.hdfs.HdfsSinkConnector", "type": "sink", "version": "4.1.0" }, { "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector", "type": "source", "version": "1.1.1-mapr-1901" }, { "class": "io.confluent.connect.jdbc.JdbcSinkConnector", "type": "sink", "version": "4.1.0" }, { "class": "io.confluent.connect.jdbc.JdbcSourceConnector", "type": "source", "version": "4.1.0" }, { "class": "io.confluent.connect.storage.tools.SchemaSourceConnector", "type": "source", "version": "1.1.1-mapr-1901" }, { "class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink", "version": "1.1.1-mapr-1901" }, { "class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source", "version": "1.1.1-mapr-1901" } ]
4. Listing active connectors on a worker
curl https://v1.poc.com:8083/connectors --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr | jq [ "mysql-source-dist" ]
5. Get Connector information
curl https://v1.poc.com:8083/connectors/mysql-source-dist --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr | jq { "name": "mysql-source-dist", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "TBL_ID", "topic.prefix": "/tmp/hivemeta:tbls", "task.user": "mapr", "tasks.max": "1", "name": "mysql-source-dist", "connection.url": "jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive", "table.whitelist": "TBLS" }, "tasks": [], "type": "source" }
6. Get Connector configuration
curl https://v1.poc.com:8083/connectors/mysql-source-dist/config --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr | jq { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "TBL_ID", "topic.prefix": "/tmp/hivemeta:tbls", "task.user": "mapr", "tasks.max": "1", "name": "mysql-source-dist", "connection.url": "jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive", "table.whitelist": "TBLS" }
7. Get Connector status
curl https://v1.poc.com:8083/connectors/mysql-source-dist/status --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr | jq { "name": "mysql-source-dist", "connector": { "state": "RUNNING", "worker_id": "v2.poc.com:8083" }, "tasks": [ { "state": "RUNNING", "id": 0, "worker_id": "v4.poc.com:8083" } ], "type": "source" }
8. Pause a Connector
curl -X PUT https://v1.poc.com:8083/connectors/mysql-source-dist/pause --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:maprAfter a while, if we check the Connector status again, both the Connector and all tasks should be PAUSED.
curl https://v1.poc.com:8083/connectors/mysql-source-dist/status --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr | jq { "name": "mysql-source-dist", "connector": { "state": "PAUSED", "worker_id": "v2.poc.com:8083" }, "tasks": [ { "state": "PAUSED", "id": 0, "worker_id": "v4.poc.com:8083" } ], "type": "source" }
9. Resume a Connector
curl -X PUT https://v1.poc.com:8083/connectors/mysql-source-dist/resume --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:maprAfter a while, if we check the Connector status again, both the Connector and all tasks should be RUNNING.
curl https://v1.poc.com:8083/connectors/mysql-source-dist/status --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr | jq { "name": "mysql-source-dist", "connector": { "state": "RUNNING", "worker_id": "v2.poc.com:8083" }, "tasks": [ { "state": "RUNNING", "id": 0, "worker_id": "v4.poc.com:8083" } ], "type": "source" }
10. Updating an exisitng Connector configuration
curl -v -X PUT https://v1.poc.com:8083/connectors/mysql-source-dist/config \ --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr \ -H "Content-Type: application/json" \ --data-binary @- << EOF { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "2", "connection.url": "jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive", "table.whitelist": "TBLS", "mode": "incrementing", "incrementing.column.name": "TBL_ID", "topic.prefix": "/tmp/hivemeta:tbls" } EOF
11. Get tasks status
curl https://v1.poc.com:8083/connectors/mysql-source-dist/tasks --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr | jq [ { "id": { "connector": "mysql-source-dist", "task": 0 }, "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "TBL_ID", "topic.prefix": "/tmp/hivemeta:tbls", "task.user": "mapr", "tables": "TBLS", "task.class": "io.confluent.connect.jdbc.source.JdbcSourceTask", "tasks.max": "2", "name": "mysql-source-dist", "connection.url": "jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive", "table.whitelist": "TBLS" } } ]
12. Get an individual task status
curl https://v1.poc.com:8083/connectors/mysql-source-dist/tasks/0/status --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr | jq { "state": "RUNNING", "id": 0, "worker_id": "v4.poc.com:8083" }
13. Restart an individual task
curl -X POST https://v4.poc.com:8083/connectors/mysql-source-dist/tasks/0/restart --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr
14. Delete the Connector
curl -X DELETE https://v1.poc.com:8083/connectors/mysql-source-dist --cacert /opt/mapr/conf/ssl_truststore.pem -u mapr:mapr
No comments:
Post a Comment