Goal:
This article is to help understand different modes in kafka-connect using an example.The example will stream data from a mysql table to MapR Event Store for Apache Kafka(aka "MapR Streams") using different modes of kafka-connect -- incrementing, bulk, timestamp and timestamp+incrementing .
Env:
MapR 6.1 (secured)mapr-kafka-1.1.1
mapr-kafka-connect-jdbc-4.1.0
Solution:
Please read documentation https://mapr.com/docs/61/Kafka/kafkaConnect.html to understand the architecture of mapr-kafka-connect firstly.Use case:
In Hive Metastore backend database -- MySQL, there is a table named "TBLS" which tracks the Hive table information.I choose "TBLS" as data source because "TBLS" has a strictly incrementing column named "TBLS_ID" and also a timestamp related column named "CREATE_TIME"(int(11) data type).
We plan to keep monitoring this table and stream the data into a MapR Streams named "/tmp/hivemeta".
Standalone mode of kafka-connect is used to demonstrate this use case easily.
To monitor what query is running on the source -- MySQL, we will enable MySQL general log.
To monitor what data is written on the target -- MapR Streams, we will use Drill to query MapR Streams using kafka storage plugin.
1. Put the MySQL JDBC driver to kafka-connect JDBC path.
cp /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/mysql-connector-java-5.1.25.jar /opt/mapr/kafka-connect-jdbc/kafka-connect-jdbc-4.1.0/share/java/kafka-connect-jdbc/
2. Recreate a MapR Streams named "/tmp/hivemeta" each time before each test.
maprcli stream delete -path /tmp/hivemeta maprcli stream create -path /tmp/hivemeta
3. mode=incrementing
incrementing: use a strictly incrementing column on each table to detect only new rows. Note that this will not detect modifications or deletions of existing rows.
Sample Connector file "test.conf":
name=mysql-whitelist-incre-source-tbls connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive table.whitelist=TBLS mode=incrementing incrementing.column.name=TBL_ID topic.prefix=/tmp/hivemeta:tblsStart the standalone mode for this Connector:
/opt/mapr/kafka/kafka-1.1.1/bin/connect-standalone.sh /opt/mapr/kafka/kafka-1.1.1/config/connect-standalone.properties ~/test.confSource side -- MySQL receives below queries:
2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > -1 ORDER BY `TBL_ID` ASC 191127 14:50:49 2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > 77 ORDER BY `TBL_ID` ASC 191127 14:50:54 2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > 77 ORDER BY `TBL_ID` ASC 191127 14:50:59 2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > 77 ORDER BY `TBL_ID` ASCThe logic is the first query will do full table scan with where condition "WHERE `TBL_ID` > -1" + order-by.
Then for new incoming data, keep scanning based on "WHERE `TBL_ID` > 77" which is the current commit offset.
Target side -- MapR Streams receives the data up to that offset.
> select t.payload.TBL_ID, t.payload.TBL_NAME from kafka.`/tmp/hivemeta:tblsTBLS` as t; +---------+-----------------------+ | EXPR$0 | EXPR$1 | ... | 77 | t1 | +---------+-----------------------+
4. mode=bulk
bulk: perform a bulk load of the entire table each time it is polled.Sample Connector file "test_bulk.conf":
name=mysql-whitelist-bulk-source-tbls connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive table.whitelist=TBLS mode=bulk topic.prefix=/tmp/hivemeta:tblsSource side -- MySQL receives below queries:
191127 14:57:54 2537 Query SELECT * FROM `TBLS` 191127 14:57:59 2537 Query SELECT * FROM `TBLS`The logic is each time it is doing a full table scan.
It is like taking a snapshot of the whole data source periodically.
If the source table is huge, it will take lots of resource(CPU/Memory/Disk/Network) on both source and target sides.
5. mode=timestamp
timestamp: use a timestamp (or timestamp-like) column to detect new and modified rows. This assumes the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.Sample Connector file "test_timestamp.conf":
name=mysql-whitelist-timestamp-source-tbls connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive #table.whitelist=TBLS mode=timestamp query=SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t timestamp.column.name=custom_timestamp topic.prefix=/tmp/hivemeta:tblsIn this example, "CREATE_TIME" column is not "timestamp" data type in MySQL.
The data for "CREATE_TIME" is actually unix timestamp in "int" data type in MySQL.
To workaround it so that we can use "mode=timestamp", I use "query" instead of "table.whitelist" to use a MySQL function "FROM_UNIXTIME" to convert that column to a "timestamp" data type with a new column name "custom_timestamp".
Then set timestamp.column.name=custom_timestamp instead.
Source side -- MySQL receives below queries:
191127 15:34:30 2551 Query select CURRENT_TIMESTAMP 2551 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` > '2019-11-27 15:34:24' AND `custom_timestamp` < '2019-11-27 15:34:30' ORDER BY `custom_timestamp` ASC 191127 15:34:35 2551 Query select CURRENT_TIMESTAMP 2551 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` > '2019-11-27 15:34:24' AND `custom_timestamp` < '2019-11-27 15:34:35' ORDER BY `custom_timestamp` ASCThe logic is it will firstly check current timestamp and use it as the upper boundary for where condition. The lower boundary is the commited offset.
6. mode=timestamp+incrementing
timestamp+incrementing: use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.Sample Connector file "test_incre+timestamp.conf":
name=mysql-whitelist-timestampincre-source-tbls connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive #table.whitelist=TBLS mode=timestamp+incrementing query=SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t timestamp.column.name=custom_timestamp incrementing.column.name=TBL_ID topic.prefix=/tmp/hivemeta:tblsSource side -- MySQL receives below queries:
191127 15:38:02 2554 Query select CURRENT_TIMESTAMP 2554 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` < '2019-11-27 15:38:02' AND ((`custom_timestamp` = '2019-11-27 15:34:24' AND `TBL_ID` > 81) OR `custom_timestamp` > '2019-11-27 15:34:24') ORDER BY `custom_timestamp`,`TBL_ID` ASC 191127 15:38:07 2554 Query select CURRENT_TIMESTAMP 2554 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` < '2019-11-27 15:38:07' AND ((`custom_timestamp` = '2019-11-27 15:37:57' AND `TBL_ID` > 82) OR `custom_timestamp` > '2019-11-27 15:37:57') ORDER BY `custom_timestamp`,`TBL_ID` ASCThe logic is a little more complex now:
timestamp is earlier than current timestamp
AND
( timestamp is the same but incrementing is larger
OR
timestamp is newer
)
what do you mean by timestamp is newer
ReplyDelete19A2B812F3
ReplyDeleteTakipçi Satın Al
3D Car Parking Para Kodu
Telegram Coin Botları
Pokemon GO Promosyon Kodu
War Robots Hediye Kodu
0274C4B2FC
ReplyDeleteWhatsapp Görüntülü Show
Sanal Sex
Canlı Cam Show