Friday, December 6, 2019

Spark Streaming sample scala code for different sources

Goal:

This article shares some sample Spark Streaming scala code for different sources -- socket text, text files in MapR-FS directory, kafka broker and MapR Event Store for Apache Kafka(MapR Streams).
These are wordcount code which can be run directly from spark-shell.

Env:

MapR 6.1
mapr-spark-2.3.2.0
mapr-kafka-1.1.1
mapr-kafka-ksql-4.1.1

Solution:

1. socket text

Data source:
Open a socket on port 9999 and type some words as the data source.
nc -lk 9999
Sample Code:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()

2. text files in MapR-FS directory

Data source:
Create a directory on MapR-FS and put text files inside as the data source.
hadoop fs -mkdir /tmp/textfile
hadoop fs -put /opt/mapr/NOTICE.txt /tmp/textfile/
Sample Code:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.textFileStream("/tmp/textfile")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()

3. kafka broker

Data source:
Assuming an existing kafka server is started:
./bin/kafka-server-start.sh ./config/server.properties
Create a new topic named "mytopic":
./bin/kafka-topics.sh --create --zookeeper localhost:5181 --replication-factor 1 --partitions 1 --topic mytopic
Start a kafka console producer and type some words as data source:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
OR use below producer:
./kafka-producer-perf-test.sh  --topic mytopic --num-records 1000000 --record-size 1000 \
--throughput 10000 --producer-props bootstrap.servers=localhost:9092 
Sample Code:
import org.apache.kafka.clients.consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{
  ConsumerStrategies,
  KafkaUtils,
  LocationStrategies
}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}

val ssc = new StreamingContext(sc, Seconds(10))

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.GROUP_ID_CONFIG -> "mysparkgroup",
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG  -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)

val topicsSet = Array("mytopic")
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      consumerStrategy)

val lines = messages.map(_.value())
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()

4. MapR Event Store for Apache Kafka(MapR Streams)

Data source:
Create a sample MapR Streams named /sample-stream
maprcli stream create -path /sample-stream -produceperm p -consumeperm p -topicperm p
Use one of the ksql tool mentioned in this blog to generate the data:
/opt/mapr/ksql/ksql-4.1.1/bin/ksql-datagen quickstart=pageviews format=delimited topic=/sample-stream:pageviews maxInterval=10000
OR use below producer:
./kafka-producer-perf-test.sh  --topic /sample-stream:pageviews --num-records 1000000 --record-size 10000 \
--throughput 10000 --producer-props bootstrap.servers=localhost:9092

Sample code:
import org.apache.kafka.clients.consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{
  ConsumerStrategies,
  KafkaUtils,
  LocationStrategies
}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}

val ssc = new StreamingContext(sc, Seconds(10))

val kafkaParams = Map[String, Object](
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.GROUP_ID_CONFIG -> "mysparkgroup",
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG  -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)

val topicsSet = Array("/sample-stream:pageviews")
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      consumerStrategy)

val lines = messages.map(_.value())
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()


1 comment: