Friday, September 7, 2018

How to use ksql on MapR Streams(MapR-ES)

Goal:

KSQL is a streaming SQL engine that enables stream processing against Apache Kafka and MapR Streams(aka MapR-ES). KSQL interacts directly with the Kafka Streams API, removing the requirement of building a Java app.
This article provides an example to quickly help you hands on ksql on MapR Streams by referring to this quick start.

Env:

MapR 6.0.1

Solution:

Note: ksql is still in developer preview in MapR 6.0.1.

1. Download the ksql tarball.

wget http://package.mapr.com/labs/ksql/package-ksql-4.1.0-SNAPSHOT-package.tgz
tar -xzvf package-ksql-4.1.0-SNAPSHOT-package.tgz
cd ksql

2. Create a MapR intenal Stream with public access

This is because security is not supported for this Developer Preview on MapR 6.0.1.
maprcli stream create -path /var/mapr/kafka-internal-stream -produceperm p -consumeperm p -topicperm p
  • It is used for internal topic creation and cli-side partition assignment.
  • Internal stream should be created before starting KSQL server.
[root@v2 data]# maprcli stream topic list -path /var/mapr/kafka-internal-stream
partitions  maxlag  logicalsize  topic                                                                consumers  physicalsize
1           0       0            __mapr__ksql_transient_4130383735097621206_1536269653676_assignment  0          0
1           0       0            ksql__commands                                                       0          0
1           0       0            __mapr__ksql_transient_7811816849950865146_1536269574284_assignment  0          0

3. Modify the configuration for ksql

vim ./config/ksqlserver.properties
Sample is:
bootstrap.servers=localhost:9092
ksql.command.topic.suffix=commands

listeners=http://localhost:8080
application.id=myksql

4. Create a sample MapR stream to work with for below use case.

maprcli stream create -path /sample-stream -produceperm p -consumeperm p -topicperm p

5. Generate messages using ksql-datagen

ksql provides a very handy tool named "ksql-datagen" to help avoid creating a test producer program.
Refer to this link.
Here we are generating sample messages into 2 topics named "pageviews" and "users".
The schema is shown as below:

  • "pageviews":
./bin/ksql-datagen quickstart=pageviews format=delimited topic=/sample-stream:pageviews maxInterval=10000
sample data is like:
1 --> ([ 1536339899430 | 'User_8' | 'Page_12' ])
11 --> ([ 1536339904639 | 'User_9' | 'Page_11' ])
21 --> ([ 1536339906898 | 'User_1' | 'Page_25' ])
...
  • "users":
./bin/ksql-datagen quickstart=users format=json topic=/sample-stream:users maxInterval=10000
sample data is like:
User_6 --> ([ 1488285478109 | 'User_6' | 'Region_1' | 'FEMALE' ])
User_7 --> ([ 1509065238387 | 'User_7' | 'Region_5' | 'OTHER' ])
User_2 --> ([ 1491876978482 | 'User_2' | 'Region_1' | 'OTHER' ])
...
Then use below 2 commands to check the topic information inside the MapR Stream:
maprcli stream topic info -path /sample-stream -topic pageviews -json
maprcli stream topic info -path /sample-stream -topic users -json

6. Start ksql in standalone mode

./bin/ksql-cli local --properties-file ./config/ksqlserver.properties
After that, the ksql java process is running:
[root@v2 config]# ps -ef|grep -i ksql
root     6768 13847 66 12:08 pts/1    00:00:07 java -cp /root/ksql/share/java/ksql/*: -Xmx3g -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dksql.log.dir=/tmp/ksql-logs -Dlog4j.configuration=file:/root/ksql/config/log4j-file.properties io.confluent.ksql.Ksql local --properties-file ./config/ksqlserver.properties
Show the current properties:
ksql> SHOW PROPERTIES;

 Property                                         | Value
------------------------------------------------------------------------------------------------------------------
 streams.default.stream                           |
 ksql.transient.prefix                            | transient_
 commit.interval.ms                               | 2000
 ksql.sink.replicas                               | 1
 listeners                                        | http://localhost:9098
 default.deserialization.exception.handler        | io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler
 bootstrap.servers                                | localhost:9099
 ksql.schema.registry.url                         | http://localhost:8081
 ksql.sink.partitions                             | 4
 ksql.statestore.suffix                           | _ksql_statestore
 ksql.service.id                                  | ksql_
 cache.max.bytes.buffering                        | 10000000
 ksql.default.stream                              |
 ksql.sink.window.change.log.additional.retention | 1000000
 auto.offset.reset                                | latest
 num.stream.threads                               | 4
 ksql.persistent.prefix                           | query_
 application.id                                   | ksql_
------------------------------------------------------------------------------------------------------------------

7. Create stream for topic "pageviews" and create table for topic "users".

Here the "stream" and "table" are concept of kafka.
  • A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. Like an "INSERT".
  • A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). Like an "UPDATE".
After understanding above concept, you may know the reason why we need to create a stream for "pageviews" and why we need to create a table for "users".
Now here are the exact commands in ksql:
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='/sample-stream:pageviews', value_format='DELIMITED');

 Message
----------------
 Stream created
----------------

ksql> DESCRIBE pageviews_original;

 Field    | Type
--------------------------------------
 ROWTIME  | BIGINT           (system)
 ROWKEY   | VARCHAR(STRING)  (system)
 VIEWTIME | BIGINT
 USERID   | VARCHAR(STRING)
 PAGEID   | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;


ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='/sample-stream:users', value_format='JSON', key = 'userid');

 Message
---------------
 Table created
---------------
ksql> DESCRIBE users_original;

 Field        | Type
------------------------------------------
 ROWTIME      | BIGINT           (system)
 ROWKEY       | VARCHAR(STRING)  (system)
 REGISTERTIME | BIGINT
 GENDER       | VARCHAR(STRING)
 REGIONID     | VARCHAR(STRING)
 USERID       | VARCHAR(STRING)
------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

8. Running a query reading above stream or table.

By default KSQL reads the topics for streams and tables from the latest offset.
ksql> select * from pageviews_original ;
1536342236738 | 4871 | 1536342235234 | User_8 | Page_12
1536342243408 | 4881 | 1536342241904 | User_4 | Page_18
1536342252370 | 4891 | 1536342250867 | User_4 | Page_13
...
The default location for local state store (RocksDB) is /tmp/kafka-streams, and for each KStreams application subdirectory /tmp/kafka-streams/<application.id> is created.
In the meantime, we can check the application id for the current running queries:
[root@v2 ~]# ls -altr /tmp/kafka-streams/
drwxr-xr-x   3 root root   17 Sep  7 10:54 ksql_transient_1376501806109265799_1536342862444
Then using the application.id we can check the "jstack <ksql pid>" output to find out the related threads for this query:
[root@v2 ~]# jstack 6768 |grep ksql_transient_1376501806109265799_1536342862444
"ksql_transient_1376501806109265799_1536342862444-76a58c09-c3a3-4c13-adb7-ab00b7f7ccf7-CleanupThread" #197 daemon prio=5 os_prio=0 tid=0x00007f9298076800 nid=0x1ed4 waiting on condition [0x00007f927d0e3000]
"ksql_transient_1376501806109265799_1536342862444-76a58c09-c3a3-4c13-adb7-ab00b7f7ccf7-StreamThread-8" #196 prio=5 os_prio=0 tid=0x00007f9298075000 nid=0x1ed3 runnable [0x00007f927d1e4000]
"ksql_transient_1376501806109265799_1536342862444-76a58c09-c3a3-4c13-adb7-ab00b7f7ccf7-StreamThread-7" #195 prio=5 os_prio=0 tid=0x00007f9298074000 nid=0x1ed2 runnable [0x00007f927d3e6000]
"ksql_transient_1376501806109265799_1536342862444-76a58c09-c3a3-4c13-adb7-ab00b7f7ccf7-StreamThread-6" #194 prio=5 os_prio=0 tid=0x00007f9298073000 nid=0x1ed1 runnable [0x00007f927d4e7000]
"ksql_transient_1376501806109265799_1536342862444-76a58c09-c3a3-4c13-adb7-ab00b7f7ccf7-StreamThread-5" #193 prio=5 os_prio=0 tid=0x00007f92982e9800 nid=0x1ed0 runnable [0x00007f927dced000]

9. Create a persistent query to store the data into a MapR Stream topic

Unlike the non-persistent query above, results from this query are written to a MapR Stream topic "pageviews_female".
CREATE STREAM pageviews_female WITH (kafka_topic='/sample-stream:pageviews_female', value_format='json') AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
After the stream is created and returned, actually the thread is running in the background:
[root@v2 ~]# jstack 6768 |grep ksql_query_CSAS_PAGEVIEWS_FEMALE
"ksql_query_CSAS_PAGEVIEWS_FEMALE-af068d1f-9ff4-4acb-abe7-e4ff37a5c91c-CleanupThread" #261 daemon prio=5 os_prio=0 tid=0x00007f92e0039000 nid=0x27aa waiting on condition [0x00007f927cee1000]
"ksql_query_CSAS_PAGEVIEWS_FEMALE-af068d1f-9ff4-4acb-abe7-e4ff37a5c91c-StreamThread-16" #260 prio=5 os_prio=0 tid=0x00007f92e0037800 nid=0x27a9 runnable [0x00007f927cfe2000]
"ksql_query_CSAS_PAGEVIEWS_FEMALE-af068d1f-9ff4-4acb-abe7-e4ff37a5c91c-StreamThread-15" #259 prio=5 os_prio=0 tid=0x00007f92e0037000 nid=0x27a8 runnable [0x00007f927d0e3000]
"ksql_query_CSAS_PAGEVIEWS_FEMALE-af068d1f-9ff4-4acb-abe7-e4ff37a5c91c-StreamThread-14" #258 prio=5 os_prio=0 tid=0x00007f92e0036000 nid=0x27a7 runnable [0x00007f927d1e4000]
"ksql_query_CSAS_PAGEVIEWS_FEMALE-af068d1f-9ff4-4acb-abe7-e4ff37a5c91c-StreamThread-13" #257 prio=5 os_prio=0 tid=0x00007f92e002d000 nid=0x27a6 runnable [0x00007f927dced000]
And the MapR Stream topic is created:
[root@v2 ~]# maprcli stream topic info -path /sample-stream -topic pageviews_female -json
{
 "timestamp":1536344466684,
 "timeofday":"2018-09-07 11:21:06.684 GMT-0700 AM",
 "status":"OK",
 "total":4,
 "data":[
  {
   "partitionid":0,
   "physicalsize":0,
   "logicalsize":0,
   "maxoffset":10,
   "minoffsetacrossconsumers":0,
   "mintimestamp":"2018-09-07T11:16:20.523-0700 AM",
   "maxtimestamp":"2018-09-07T11:21:00.328-0700 AM",
   "mintimestampacrossconsumers":"1969-12-31T04:00:00.000-0800 PM",
   "fid":"2078.69.262670",
   "master":"v1.poc.com:5660",
   "servers":"v2.poc.com:5660, v1.poc.com:5660, v4.poc.com:5660",
   "timestamptype":"CreateTime"
  },
  {
   "partitionid":1,
   "physicalsize":212992,
   "logicalsize":90112,
   "maxoffset":4,
   "minoffsetacrossconsumers":0,
   "mintimestamp":"2018-09-07T11:19:19.452-0700 AM",
   "maxtimestamp":"2018-09-07T11:20:57.326-0700 AM",
   "mintimestampacrossconsumers":"1969-12-31T04:00:00.000-0800 PM",
   "fid":"2092.32.262366",
   "master":"v2.poc.com:5660",
   "servers":"v2.poc.com:5660, v4.poc.com:5660, v3.poc.com:5660",
   "timestamptype":"CreateTime"
  },
  {
   "partitionid":2,
   "physicalsize":0,
   "logicalsize":0,
   "maxoffset":3,
   "minoffsetacrossconsumers":0,
   "mintimestamp":"2018-09-07T11:18:47.229-0700 AM",
   "maxtimestamp":"2018-09-07T11:20:52.785-0700 AM",
   "mintimestampacrossconsumers":"1969-12-31T04:00:00.000-0800 PM",
   "fid":"2182.96.262582",
   "master":"v4.poc.com:5660",
   "servers":"v1.poc.com:5660, v4.poc.com:5660, v3.poc.com:5660",
   "timestamptype":"CreateTime"
  },
  {
   "partitionid":3,
   "physicalsize":0,
   "logicalsize":0,
   "maxoffset":6,
   "minoffsetacrossconsumers":0,
   "mintimestamp":"2018-09-07T11:15:48.273-0700 AM",
   "maxtimestamp":"2018-09-07T11:20:13.588-0700 AM",
   "mintimestampacrossconsumers":"1969-12-31T04:00:00.000-0800 PM",
   "fid":"2080.41.262504",
   "master":"v1.poc.com:5660",
   "servers":"v2.poc.com:5660, v1.poc.com:5660, v3.poc.com:5660",
   "timestamptype":"CreateTime"
  }
 ]
}
Since here the result topic "pageviews_female" is created in json format, so basically you can also use Drill to query it as well. Refer to this link.

No comments:

Post a Comment

Popular Posts