Wednesday, September 5, 2018

How to use Drill on MapR Streams through kafka storage plugin

Goal:

How to use Drill on MapR Streams through kafka storage plugin.

Env:

Drill 1.13
MapR 6.0.1

Solution:

Drill can query MapR Streams through Kafka storage plugin which was firstly introduced in Drill 1.12.
The steps to follow to setup Kafka storage plugin on MapR platform is here.
Below are a quick walk-through:

1. Refresh the kafka related jar files in Drill's 3rdparty jar directory.

clush -a "mv /opt/mapr/drill/drill-1.13.0/jars/3rdparty/kafka*.jar /tmp/"
clush -a "cp /opt/mapr/lib/mapr-streams-6.0.1-mapr.jar /opt/mapr/drill/drill-1.13.0/jars/3rdparty/"
clush -a "cp /opt/mapr/kafka/kafka-1.0.1/libs/kafka_2.11-1.0.1-mapr-1803.jar /opt/mapr/drill/drill-1.13.0/jars/3rdparty/"
clush -a "cp /opt/mapr/kafka/kafka-1.0.1/libs/kafka-clients-1.0.1-mapr-1803.jar /opt/mapr/drill/drill-1.13.0/jars/3rdparty/"

2. Restart all drillbits

maprcli node services -name drill-bits -action restart -filter csvc=="drill-bits"

3. Configure and enable "kafka" storage plugin

Assume the MapR stream path you want to query is "/stream/json", we need to set:
"streams.consumer.default.stream": "/stream/json"
One complete example of kafka storage plugin is:
{
  "type": "kafka",
  "kafkaConsumerProps": {
    "key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
    "bootstrap.servers": "localhost:9092",
    "group.id": "drill-consumer",
    "streams.consumer.default.stream": "/stream/json",
    "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
  },
  "enabled": true
}

4. Query MapR Stream using Drill

Firstly "show tables" should return the stream and its topic name.
0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5> use kafka;
+-------+------------------------------------+
|  ok   |              summary               |
+-------+------------------------------------+
| true  | Default schema changed to [kafka]  |
+-------+------------------------------------+
1 row selected (0.171 seconds)
0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5> show tables;
+---------------+--------------------+
| TABLE_SCHEMA  |     TABLE_NAME     |
+---------------+--------------------+
| kafka         | /stream/json:info  |
+---------------+--------------------+
1 row selected (0.19 seconds)
As of now, the follow fields are supported:
  • kafkaTopic
  • kafkaPartitionId
  • kafkaMsgOffset
  • kafkaMsgTimestamp
  • kafkaMsgKey, unless it is not null
For example:
0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5>  select * from kafka.`/stream/json:info` limit 5;
+------+--------------------+-------------------+-----------------+--------------------+
| Msg  |     kafkaTopic     | kafkaPartitionId  | kafkaMsgOffset  | kafkaMsgTimestamp  |
+------+--------------------+-------------------+-----------------+--------------------+
| 0    | /stream/json:info  | 0                 | 0               | 1536178967377      |
| 1    | /stream/json:info  | 0                 | 1               | 1536178967377      |
| 2    | /stream/json:info  | 0                 | 2               | 1536178967377      |
| 3    | /stream/json:info  | 0                 | 3               | 1536178970377      |
| 4    | /stream/json:info  | 0                 | 4               | 1536178970377      |
+------+--------------------+-------------------+-----------------+--------------------+
5 rows selected (0.321 seconds)
Note: Here "Msg" column is the payload which is created by my customized producer.
Most part of my producer is similar as this link.
The only change is value of this stream topic is created like this java code:
for(int i = 0; i < numMessages; i++) {
   String messageText = "{\"Msg\": " + i + "}";
   ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, messageText);
   ...
}
Here we created a json string for the value like : {"Msg": 123}.
This is because current version of Drill only supports json reader, which means below parameter store.kafka.record.reader=org.apache.drill.exec.store.kafka.decoders.JsonMessageReader by default.
0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5> select string_val from sys.options where name='store.kafka.record.reader';
+---------------------------------------------------------------+
|                          string_val                           |
+---------------------------------------------------------------+
| org.apache.drill.exec.store.kafka.decoders.JsonMessageReader  |
+---------------------------------------------------------------+
1 row selected (0.223 seconds)

Limitations(as of Drill 1.13)

1. Only Json message is readable

As explained above, if the message is not in json format, below error may show up:
0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5>  select * from kafka.`/stream/s1:info` limit 5;
Error: DATA_READ ERROR: Failure while reading messages from kafka. Recordreader was at record: 1

com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 1 column 6
Fragment 0:0

[Error Id: 365ae54b-ba73-4611-833a-0ee1601b11d7 on v1.poc.com:31010] (state=,code=0)

2. Filter push down does not exist on Drill 1.13, but is improved in Drill 1.14.

In Drill 1.13, even for filters on offset, it results in a full table(stream:topic) scan.
For example, if the query is:
select * from kafka.`/stream/json:info` where kafkaMsgOffset=500
The physical plan is:
00-00    Screen : rowType = RecordType(DYNAMIC_STAR **): rowcount = 1231.6499999999999, cumulative cost = {28451.115000000005 rows, 74432.71499999998 cpu, 8408064.0 io, 0.0 network, 0.0 memory}, id = 7372
00-01      Project(**=[$0]) : rowType = RecordType(DYNAMIC_STAR **): rowcount = 1231.6499999999999, cumulative cost = {28327.950000000004 rows, 74309.54999999999 cpu, 8408064.0 io, 0.0 network, 0.0 memory}, id = 7371
00-02        Project(T5¦¦**=[$0]) : rowType = RecordType(DYNAMIC_STAR T5¦¦**): rowcount = 1231.6499999999999, cumulative cost = {27096.300000000003 rows, 73077.9 cpu, 8408064.0 io, 0.0 network, 0.0 memory}, id = 7370
00-03          SelectionVectorRemover : rowType = RecordType(DYNAMIC_STAR T5¦¦**, ANY kafkaMsgOffset): rowcount = 1231.6499999999999, cumulative cost = {25864.65 rows, 71846.25 cpu, 8408064.0 io, 0.0 network, 0.0 memory}, id = 7369
00-04            Filter(condition=[=($1, 500)]) : rowType = RecordType(DYNAMIC_STAR T5¦¦**, ANY kafkaMsgOffset): rowcount = 1231.6499999999999, cumulative cost = {24633.0 rows, 70614.6 cpu, 8408064.0 io, 0.0 network, 0.0 memory}, id = 7368
00-05              Project(T5¦¦**=[$0], kafkaMsgOffset=[$1]) : rowType = RecordType(DYNAMIC_STAR T5¦¦**, ANY kafkaMsgOffset): rowcount = 8211.0, cumulative cost = {16422.0 rows, 32844.0 cpu, 8408064.0 io, 0.0 network, 0.0 memory}, id = 7367
00-06                Scan(table=[[kafka, /stream/json:info]], groupscan=[KafkaGroupScan [KafkaScanSpec=KafkaScanSpec [topicName=/stream/json:info], columns=[`**`]]]) : rowType = RecordType(DYNAMIC_STAR **, ANY kafkaMsgOffset): rowcount = 8211.0, cumulative cost = {8211.0 rows, 16422.0 cpu, 8408064.0 io, 0.0 network, 0.0 memory}, id = 7366
Per Drill Doc, Drill 1.14 has below filter push down support.
  • kafkaPartitionId
    Conditions on the kafkaPartitionId metadata field limit the number of partitions that Drill scans, which is useful for data exploration. Drill can push down filters when a query contains the following conditions on the kafkaPartitionId metadata field:
    =, >, >=, <, <=
  • kafkaMsgOffset
    Drill can push down filters when a query contains the following conditions on the kafkaMsgOffset metadata field:
    =, >, >=, <, <=
  • kafkaMsgTimestamp
    The kafkaMsgTimestamp field maps to the timestamp stored for each Kafka message. Drill can push down filters when a query contains the following conditions on the kafkaMsgTimestamp metadata field:
    =, >, >=

3. Querying on replicated target stream may result in  "Error: DATA_READ ERROR: Failed to fetch messages within 200 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout".

On Drill 1.13, if we replicate "/stream/json" to target "/stream/jsontarget":
maprcli stream replica autosetup -path /stream/json -replica /stream/jsontarget
Then querying on target stream -- "/stream/jsontarget" may result in below error sometimes:
Error: DATA_READ ERROR: Failed to fetch messages within 200 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout

DATA_READ ERROR: Failed to fetch messages within 200 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout


[Error Id: a25039f9-d539-4f8d-a99b-3ee13d5cfebd ]
Fragment 0:0

[Error Id: a25039f9-d539-4f8d-a99b-3ee13d5cfebd on v1.poc.com:31010] (state=,code=0)
This #3 is already fixed in Drill 1.14 on MapR 6.1

Use cases

Drill on MapR Streams can be a handy tool to do batch analysis or checking the lag/difference between source and target streams.
For example, at partition level:
0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5> select kafkaPartitionId,count(*) from kafka.`/stream/json:info` group by kafkaPartitionId;
+-------------------+---------+
| kafkaPartitionId  | EXPR$1  |
+-------------------+---------+
| 0                 | 5343    |
| 1                 | 953     |
| 2                 | 960     |
| 3                 | 955     |
+-------------------+---------+
4 rows selected (0.616 seconds)

No comments:

Post a Comment

Popular Posts