Spark Structured Streaming - Socket Word Count (2/3)
Socket Word Count demo for Spark Structured Streaming
Structured Streaming is a new of looking at realtime streaming. In this post we will see how to build our very first Structured Streaming app to perform Word Count over network.
Socket Word Count
Initialize the
spark
session asval spark = SparkSession .builder() .master("local") .appName("Socket_Streaming") .getOrCreate()
Next, we need to initialize a ReadStream
val socketStreamDf = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
Let’s look into details. Here,
- readstream : belong to
org.apache.spark.sql.streaming.DataStreamReader
. This is used to load streaming data from external storage systems - format(“socket”) : the source of our streaming data is socket
- .option(“host”, “localhost”).option(“port”, 9999) : These options specify the hostname and port number of socket
The dataframe
socketStreamDf
is now a streaming dataframe and we can perform all the Dataframe operations onsocketStreamDf
.- readstream : belong to
Now, we go ahead and perform WordCount on this dataframe
import spark.implicits._ val words = socketStreamDf.as[String].flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count()
We then go ahead and create a streaming query that will output the wordcount to console sink.
val query = wordCounts.writeStream .outputMode("complete") .format("console") .start()
Let’s look into the details,
- writeStream : belong to
org.apache.spark.sql.streaming.DataStreamWriter
. This is used to write a streaming data from external storage systems - format(“console”) : sink to output the results in of type console.
- .outputMode(“complete”) : This specifies that the result will be output to the sink after every trigger
- start() : Finally, we start the streaming process.
- writeStream : belong to
The StreamingQuery object created when a query is started can be used to monitor and manage the query. We use awaitTermination to specify the app to block until query is terminated, with stop() or with error
query.awaitTermination()
Full code is available in my GitHub Repo
TO RUN
Open Terminal and type
Pavans-MacBook-Pro:Spark_Streaming_Examples pavanpkulkarni$ nc -lk 9999
Run the spark application. Make sure you run the above
nc -lk
command before running the streaming application.Start typing in terminal
Pavans-MacBook-Pro:Spark_Streaming_Examples pavanpkulkarni$ nc -lk 9999 hi welcome to spark streaming welcme welcome again
You will see the output on the output in console as:
------------------------------------------- Batch: 1 ------------------------------------------- +---------+-----+ | value|count| +---------+-----+ | welcome| 2| |streaming| 1| | again| 1| | spark| 1| | hi| 1| | welcme| 1| | to| 1| +---------+-----+
The word counts will start appending to dataframe in realtime as you the you keep typing in terminal.
Share this post
Twitter
Google+
Facebook
LinkedIn
Email