Thursday, November 28, 2019

Understanding different modes in kafka-connect using an example

Goal:

This article is to help understand different modes in kafka-connect using an example.
The example will stream data from a mysql table to MapR Event Store for Apache Kafka(aka "MapR Streams") using different modes of kafka-connect -- incrementing, bulk, timestamp and timestamp+incrementing .

Env:

MapR 6.1 (secured)
mapr-kafka-1.1.1
mapr-kafka-connect-jdbc-4.1.0

Solution:

Please read documentation https://mapr.com/docs/61/Kafka/kafkaConnect.html to understand the architecture of mapr-kafka-connect firstly.

Use case:

In Hive Metastore backend database -- MySQL, there is a table named "TBLS" which tracks the Hive table information.
I choose "TBLS" as data source because "TBLS" has a strictly incrementing column named "TBLS_ID" and also a timestamp related column named "CREATE_TIME"(int(11) data type).
We plan to keep monitoring this table and stream the data into a MapR Streams named "/tmp/hivemeta".
Standalone mode of kafka-connect is used to demonstrate this use case easily.

To monitor what query is running on the source -- MySQL, we will enable MySQL general log.
To monitor what data is written on the target -- MapR Streams, we will use Drill to query MapR Streams using kafka storage plugin.

1. Put the MySQL JDBC driver to kafka-connect JDBC path.

cp /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/mysql-connector-java-5.1.25.jar /opt/mapr/kafka-connect-jdbc/kafka-connect-jdbc-4.1.0/share/java/kafka-connect-jdbc/

2. Recreate a MapR Streams named "/tmp/hivemeta" each time before each test.

maprcli stream delete -path /tmp/hivemeta
maprcli stream create -path /tmp/hivemeta

3. mode=incrementing

incrementing: use a strictly incrementing column on each table to detect only new rows. Note that this will not detect modifications or deletions of existing rows.
Sample Connector file "test.conf":
name=mysql-whitelist-incre-source-tbls
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive
table.whitelist=TBLS
mode=incrementing
incrementing.column.name=TBL_ID
topic.prefix=/tmp/hivemeta:tbls
Start the standalone mode for this Connector:
/opt/mapr/kafka/kafka-1.1.1/bin/connect-standalone.sh /opt/mapr/kafka/kafka-1.1.1/config/connect-standalone.properties ~/test.conf
Source side -- MySQL receives below queries:
   2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > -1 ORDER BY `TBL_ID` ASC
191127 14:50:49  2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > 77 ORDER BY `TBL_ID` ASC
191127 14:50:54  2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > 77 ORDER BY `TBL_ID` ASC
191127 14:50:59  2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > 77 ORDER BY `TBL_ID` ASC
The logic is the first query will do full table scan with where condition "WHERE `TBL_ID` > -1" + order-by.
Then for new incoming data, keep scanning based on "WHERE `TBL_ID` > 77" which is the current commit offset.
Target side -- MapR Streams receives the data up to that offset.
> select t.payload.TBL_ID, t.payload.TBL_NAME from kafka.`/tmp/hivemeta:tblsTBLS` as t;
+---------+-----------------------+
| EXPR$0  |        EXPR$1         |
...
| 77      | t1                    |
+---------+-----------------------+

4. mode=bulk

bulk: perform a bulk load of the entire table each time it is polled.
Sample Connector file "test_bulk.conf":
name=mysql-whitelist-bulk-source-tbls
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive
table.whitelist=TBLS
mode=bulk
topic.prefix=/tmp/hivemeta:tbls
Source side -- MySQL receives below queries:
191127 14:57:54  2537 Query SELECT * FROM `TBLS`
191127 14:57:59  2537 Query SELECT * FROM `TBLS`
The logic is each time it is doing a full table scan.
It is like taking a snapshot of the whole data source periodically.
If the source table is huge, it will take lots of resource(CPU/Memory/Disk/Network) on both source and target sides.

5. mode=timestamp

timestamp: use a timestamp (or timestamp-like) column to detect new and modified rows. This assumes the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.
Sample Connector file "test_timestamp.conf":
name=mysql-whitelist-timestamp-source-tbls
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive
#table.whitelist=TBLS
mode=timestamp
query=SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t
timestamp.column.name=custom_timestamp
topic.prefix=/tmp/hivemeta:tbls
In this example, "CREATE_TIME" column is not "timestamp" data type in MySQL.
The data for "CREATE_TIME" is actually unix timestamp in "int" data type in MySQL.
To workaround it so that we can use "mode=timestamp", I use "query" instead of "table.whitelist" to use a MySQL function "FROM_UNIXTIME" to convert that column to a "timestamp" data type with a new column name "custom_timestamp".
Then set timestamp.column.name=custom_timestamp instead.
Source side -- MySQL receives below queries:
191127 15:34:30  2551 Query select CURRENT_TIMESTAMP
   2551 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` > '2019-11-27 15:34:24' AND `custom_timestamp` < '2019-11-27 15:34:30' ORDER BY `custom_timestamp` ASC
191127 15:34:35  2551 Query select CURRENT_TIMESTAMP
   2551 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` > '2019-11-27 15:34:24' AND `custom_timestamp` < '2019-11-27 15:34:35' ORDER BY `custom_timestamp` ASC
The logic is it will firstly check current timestamp and use it as the upper boundary for where condition. The lower boundary is the commited offset.

6. mode=timestamp+incrementing

timestamp+incrementing: use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.
Sample Connector file "test_incre+timestamp.conf":
name=mysql-whitelist-timestampincre-source-tbls
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive
#table.whitelist=TBLS
mode=timestamp+incrementing
query=SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t
timestamp.column.name=custom_timestamp
incrementing.column.name=TBL_ID
topic.prefix=/tmp/hivemeta:tbls
Source side -- MySQL receives below queries:
191127 15:38:02  2554 Query select CURRENT_TIMESTAMP
   2554 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` < '2019-11-27 15:38:02' AND ((`custom_timestamp` = '2019-11-27 15:34:24' AND `TBL_ID` > 81) OR `custom_timestamp` > '2019-11-27 15:34:24') ORDER BY `custom_timestamp`,`TBL_ID` ASC
191127 15:38:07  2554 Query select CURRENT_TIMESTAMP
   2554 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` < '2019-11-27 15:38:07' AND ((`custom_timestamp` = '2019-11-27 15:37:57' AND `TBL_ID` > 82) OR `custom_timestamp` > '2019-11-27 15:37:57') ORDER BY `custom_timestamp`,`TBL_ID` ASC
The logic is a little more complex now:

timestamp is earlier than current timestamp
AND
( timestamp is the same but incrementing is larger
  OR
  timestamp is newer
)

1 comment:

Popular Posts