Friday, December 6, 2019

Spark Streaming sample scala code for different sources

Goal:

This article shares some sample Spark Streaming scala code for different sources -- socket text, text files in MapR-FS directory, kafka broker and MapR Event Store for Apache Kafka(MapR Streams).
These are wordcount code which can be run directly from spark-shell.

Env:

MapR 6.1
mapr-spark-2.3.2.0
mapr-kafka-1.1.1
mapr-kafka-ksql-4.1.1

Solution:

1. socket text

Data source:
Open a socket on port 9999 and type some words as the data source.
nc -lk 9999
Sample Code:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()

2. text files in MapR-FS directory

Data source:
Create a directory on MapR-FS and put text files inside as the data source.
hadoop fs -mkdir /tmp/textfile
hadoop fs -put /opt/mapr/NOTICE.txt /tmp/textfile/
Sample Code:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.textFileStream("/tmp/textfile")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()

3. kafka broker

Data source:
Assuming an existing kafka server is started:
./bin/kafka-server-start.sh ./config/server.properties
Create a new topic named "mytopic":
./bin/kafka-topics.sh --create --zookeeper localhost:5181 --replication-factor 1 --partitions 1 --topic mytopic
Start a kafka console producer and type some words as data source:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
OR use below producer:
./kafka-producer-perf-test.sh  --topic mytopic --num-records 1000000 --record-size 1000 \
--throughput 10000 --producer-props bootstrap.servers=localhost:9092 
Sample Code:
import org.apache.kafka.clients.consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{
  ConsumerStrategies,
  KafkaUtils,
  LocationStrategies
}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}

val ssc = new StreamingContext(sc, Seconds(10))

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.GROUP_ID_CONFIG -> "mysparkgroup",
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG  -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)

val topicsSet = Array("mytopic")
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      consumerStrategy)

val lines = messages.map(_.value())
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()

4. MapR Event Store for Apache Kafka(MapR Streams)

Data source:
Create a sample MapR Streams named /sample-stream
maprcli stream create -path /sample-stream -produceperm p -consumeperm p -topicperm p
Use one of the ksql tool mentioned in this blog to generate the data:
/opt/mapr/ksql/ksql-4.1.1/bin/ksql-datagen quickstart=pageviews format=delimited topic=/sample-stream:pageviews maxInterval=10000
OR use below producer:
./kafka-producer-perf-test.sh  --topic /sample-stream:pageviews --num-records 1000000 --record-size 10000 \
--throughput 10000 --producer-props bootstrap.servers=localhost:9092

Sample code:
import org.apache.kafka.clients.consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{
  ConsumerStrategies,
  KafkaUtils,
  LocationStrategies
}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}

val ssc = new StreamingContext(sc, Seconds(10))

val kafkaParams = Map[String, Object](
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.GROUP_ID_CONFIG -> "mysparkgroup",
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG  -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)

val topicsSet = Array("/sample-stream:pageviews")
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      consumerStrategy)

val lines = messages.map(_.value())
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()


3 comments:

  1. Cool stuff you have got and you keep update all of us. streaming video recorder

    ReplyDelete
  2. PTV is defined as Internet Protocol television which is a system, where services for digital television are delivered through broadband IP link with the use of data communications. There are established organizations offering IPTV consulting to assist vendors and operators to safely enter the complex world of smart iptv

    ReplyDelete
  3. Great post, and great website. Thanks for the information! streaming

    ReplyDelete