Thursday, April 21, 2016

MapR Stream Workshop 3: Consumer Group and Cursor

Theory:

Per Documentation about Consumer Group:
You can group consumers together by setting the same value for the group.id configuration parameter when you start each consumer.


Per Documentation about Cursor:
  • Read cursors
    A consumer's read cursor is the offset of the most recent message that MapR Streams has sent to a consumer from a partition.
  • Committed cursors
    Consumers that are part of a consumer group can save the current position of their read cursor. Consumers can do this either automatically or manually. The saved cursor is called a committed cursor because it indicates that the consumer has processed all messages in a partition up to and including the one with this offset. 
Note that: Persistent cursors require consumer group.

Experiment:

Here we add a parameter "group.id"="CG1" in SampleConsumer_CG1.
To make things simplest, here the topic only has one partition, and Consumer Group "CG1" will start only one consumer.

Session A: Producer -- SampleProducer_1sec is running.
mapr openkb.stream.SampleProducer_1sec
Session B: Consumer with group.id="CG1" --  SampleConsumer_CG1 is also running. This is what we will play with.
mapr openkb.stream.SampleConsumer_CG1

1. Kill the Consumer using "CTRL-C".

This is to simulate the consumer failure scenario.
 Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366086948, timestamp = 1461255947151, producer = root, key = null, value = Msg 146)
 Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366086949, timestamp = 1461255947151, producer = root, key = null, value = Msg 147)
^C
From above last output of Consumer, we know that the latest read message "Msg 147" is at "offset = 366086949". So the read cursor should be pointing to the next offset -- 366086950.

2. Check committed cursor of the topic for Consumer Group "CG1".

# maprcli stream cursor list -path /stream/s1 -topic info -consumergroup CG1
topic  partitionid  consumertimestamp             produceroffset  consumergroup  consumerlagmillis  committedoffset  producertimestamp
info   0            2016-04-21T11:25:47.151-0500  366087192       CG1            6004               366086950        2016-04-21T11:25:53.155-0500
Right now, the committed cursor is the same as read cursor -- 366086950.

3. Restart Consumer and kill it immediately by "CTRL-C".

Consumer can successfully start from the message with offset >= 366086950(committed cursor).
Note: There could be gap between 2 adjacent offsets.
In this case, the next available message "Msg 148 " is at offset=366087029.
 Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087029, timestamp = 1461255950153, producer = root, key = null, value = Msg 148)
 Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087030, timestamp = 1461255950153, producer = root, key = null, value = Msg 149)
 Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087031, timestamp = 1461255950153, producer = root, key = null, value = Msg 150)
^C
From above last output of Consumer, we know that the latest read message "Msg 150" is at offset 366087031. So the read cursor should be pointing to the next offset 366087032.

4. Check committed cursor again.

maprcli stream cursor list -path /stream/s1 -topic info -consumergroup CG1
topic  partitionid  consumertimestamp             produceroffset  consumergroup  consumerlagmillis  committedoffset  producertimestamp
info   0            2016-04-21T11:25:47.151-0500  366087684       CG1            24016              366086950        2016-04-21T11:26:11.167-0500
Since we cancelled the consumer too fast, the Consumer did not have time to change the committed cursor. So the committed cursor is still at offset 366086950.

5. Restart Consumer again.

 Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087029, timestamp = 1461255950153, producer = root, key = null, value = Msg 148)
 Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087030, timestamp = 1461255950153, producer = root, key = null, value = Msg 149)
 Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087031, timestamp = 1461255950153, producer = root, key = null, value = Msg 150)
Consumer starts again from "Msg 148" at offset 366087029.

This test prove that:
a. Duplicate messages could be read in the case of consumer failure.
b. Committed cursor could be lagging behind read cursor.
c. Consumer starts reading from committed cursor instead of read cursor.

No comments:

Post a Comment

Popular Posts