Wednesday, December 4, 2019

How to submit REST requests to a distributed Kafka Connect cluster

Goal:

This article shares the examples of curl commands to submit REST requests to a distributed Kafka Connect cluster.

Env:

MapR 6.1 (secured)
mapr-kafka-1.1.1
mapr-kafka-connect-jdbc-4.1.0

Prerequisite:

1. This article will use the same use case documented in this article talking about standalone Kafka Connect. Please understand this use case firstly.
2. To better format the json object, we need to install a tool "jq" on this CentOS 7 env using below command.
yum install epel-release
yum install jq
3. Create a  MapR Streams named "/tmp/hivemeta".
maprcli stream delete -path /tmp/hivemeta
maprcli stream create -path /tmp/hivemeta
4. Put the MySQL JDBC driver to kafka-connect JDBC path.
cp /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/mysql-connector-java-5.1.25.jar /opt/mapr/kafka-connect-jdbc/kafka-connect-jdbc-4.1.0/share/java/kafka-connect-jdbc/
5. Restart all the Kafka Connect cluster on all nodes.
In this example, this is a 4-nodes Kafka Connect cluster -- v1.poc.com, v2.poc.com, v3.poc.com and v4.poc.com.
maprcli node services -name kafka-connect -action restart -nodes `hostname -f`

Solution:

1. Create a Connector named "mysql-source-dist"

curl -v -X POST https://v1.poc.com:8083/connectors \
--cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr \
-H "Content-Type: application/json" \
--data-binary @- << EOF
{          
    "name": "mysql-source-dist",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "tasks.max": "1",
      "connection.url": "jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive",
      "table.whitelist": "TBLS",
      "mode": "incrementing",
      "incrementing.column.name": "TBL_ID",
      "topic.prefix": "/tmp/hivemeta:tbls"
 }
}
EOF

2. Check Kafka Connect version

curl https://v1.poc.com:8083 --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr | jq

{
  "version": "1.1.1-mapr-1901",
  "commit": "8185d9b9630a495d",
  "kafka_cluster_id": "7038424823489051793"
}

3. List connector plugins available on this worker

curl https://v1.poc.com:8083/connector-plugins --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr | jq

[
  {
    "class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "type": "sink",
    "version": "4.1.0"
  },
  {
    "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
    "type": "source",
    "version": "1.1.1-mapr-1901"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "4.1.0"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "type": "source",
    "version": "4.1.0"
  },
  {
    "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
    "type": "source",
    "version": "1.1.1-mapr-1901"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "1.1.1-mapr-1901"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "1.1.1-mapr-1901"
  }
]

4. Listing active connectors on a worker

curl https://v1.poc.com:8083/connectors --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr | jq

[
  "mysql-source-dist"
]

5. Get Connector information

curl https://v1.poc.com:8083/connectors/mysql-source-dist --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr | jq

{
  "name": "mysql-source-dist",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "mode": "incrementing",
    "incrementing.column.name": "TBL_ID",
    "topic.prefix": "/tmp/hivemeta:tbls",
    "task.user": "mapr",
    "tasks.max": "1",
    "name": "mysql-source-dist",
    "connection.url": "jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive",
    "table.whitelist": "TBLS"
  },
  "tasks": [],
  "type": "source"
}

6. Get Connector configuration

curl https://v1.poc.com:8083/connectors/mysql-source-dist/config --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr | jq

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "incrementing",
  "incrementing.column.name": "TBL_ID",
  "topic.prefix": "/tmp/hivemeta:tbls",
  "task.user": "mapr",
  "tasks.max": "1",
  "name": "mysql-source-dist",
  "connection.url": "jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive",
  "table.whitelist": "TBLS"
}

7. Get Connector status

curl https://v1.poc.com:8083/connectors/mysql-source-dist/status --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr | jq

{
  "name": "mysql-source-dist",
  "connector": {
    "state": "RUNNING",
    "worker_id": "v2.poc.com:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "v4.poc.com:8083"
    }
  ],
  "type": "source"
}

8. Pause a Connector

curl -X PUT https://v1.poc.com:8083/connectors/mysql-source-dist/pause --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr
After a while, if we check the Connector status again, both the Connector and all tasks should be PAUSED.
curl https://v1.poc.com:8083/connectors/mysql-source-dist/status --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr | jq

{
  "name": "mysql-source-dist",
  "connector": {
    "state": "PAUSED",
    "worker_id": "v2.poc.com:8083"
  },
  "tasks": [
    {
      "state": "PAUSED",
      "id": 0,
      "worker_id": "v4.poc.com:8083"
    }
  ],
  "type": "source"
}

9. Resume a Connector

curl -X PUT https://v1.poc.com:8083/connectors/mysql-source-dist/resume --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr
After a while, if we check the Connector status again, both the Connector and all tasks should be RUNNING.
curl https://v1.poc.com:8083/connectors/mysql-source-dist/status --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr | jq

{
  "name": "mysql-source-dist",
  "connector": {
    "state": "RUNNING",
    "worker_id": "v2.poc.com:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "v4.poc.com:8083"
    }
  ],
  "type": "source"
}

10. Updating an exisitng Connector configuration

curl -v -X PUT https://v1.poc.com:8083/connectors/mysql-source-dist/config \
--cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr \
-H "Content-Type: application/json" \
--data-binary @- << EOF
{
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "tasks.max": "2",
      "connection.url": "jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive",
      "table.whitelist": "TBLS",
      "mode": "incrementing",
      "incrementing.column.name": "TBL_ID",
      "topic.prefix": "/tmp/hivemeta:tbls"
}
EOF

11. Get tasks status

curl https://v1.poc.com:8083/connectors/mysql-source-dist/tasks --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr | jq

[
  {
    "id": {
      "connector": "mysql-source-dist",
      "task": 0
    },
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "mode": "incrementing",
      "incrementing.column.name": "TBL_ID",
      "topic.prefix": "/tmp/hivemeta:tbls",
      "task.user": "mapr",
      "tables": "TBLS",
      "task.class": "io.confluent.connect.jdbc.source.JdbcSourceTask",
      "tasks.max": "2",
      "name": "mysql-source-dist",
      "connection.url": "jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive",
      "table.whitelist": "TBLS"
    }
  }
]

12. Get an individual task status

curl https://v1.poc.com:8083/connectors/mysql-source-dist/tasks/0/status --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr | jq

{
  "state": "RUNNING",
  "id": 0,
  "worker_id": "v4.poc.com:8083"
}

13. Restart an individual task

curl -X POST https://v4.poc.com:8083/connectors/mysql-source-dist/tasks/0/restart --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr

14. Delete the Connector

curl -X DELETE https://v1.poc.com:8083/connectors/mysql-source-dist --cacert /opt/mapr/conf/ssl_truststore.pem  -u mapr:mapr

Reference:

https://mapr.com/docs/61/Kafka/Connect-rest-api.html

No comments:

Post a Comment