Spark Structured Streaming - File-to-File Real-time Streaming (3/3)
CSV File to JSON File Real Time Streaming Example
In this post we will see how to build a simple application to process file to file real time processing.
Most of the clients I have worked with so far still rely on files - either CSV, TSV or JSON. These applications are usually batch processed. In this new age world of realtime processing it would be great to move these batch processes to some kind of streaming process that can provide realtime data processing.
We will see in this post how to process data from a CSV file to JSON file in realtime.
Set up to this is similar to all our previous Spark Examples.
Let’s build a use case
We have 2 directories,
src/main/resources/input/cutomer_info
which contains a static file with Customer informationsrc/main/resources/input/orders
in which CSV files with order details are dropped periodically. This directory is partitioned date-wise as shown belowPavans-MacBook-Pro:Spark_Streaming_Examples pavanpkulkarni$ cd src/main/resources/input/ Pavans-MacBook-Pro:input pavanpkulkarni$ ls -ltr * cutomer_info: total 8 -rw-r--r-- 1 pavanpkulkarni staff 58 Jun 12 11:30 customer.csv orders: total 0 drwxr-xr-x 3 pavanpkulkarni staff 96 Jun 12 11:30 date=2018-06-02 drwxr-xr-x 3 pavanpkulkarni staff 96 Jun 12 11:54 date=2018-06-04 drwxr-xr-x 3 pavanpkulkarni staff 96 Jun 12 11:57 date=2018-06-03 drwxr-xr-x 3 pavanpkulkarni staff 96 Jun 12 13:00 date=2018-06-05 drwxr-xr-x 3 pavanpkulkarni staff 96 Jul 2 12:08 date=2018-06-01 Pavans-MacBook-Pro:input pavanpkulkarni$
Objective here is to join the order details from CSV file with the customer information file, and write the resulting data to JSON file as output in real-time.
Let’s Talk Scala !
We have customer information is as follows
Pavans-MacBook-Pro:input pavanpkulkarni$ cat cutomer_info/customer.csv id,name,location 1,kash,VT 2,pavan,IL 3,john,CA 4,jane,NJ Pavans-MacBook-Pro:input pavanpkulkarni$
Sample CSV data with order information is :
Pavans-MacBook-Pro:input pavanpkulkarni$ cat orders/date\=2018-06-01/data.csv id,pid,pname,date 1,011,p11,2018-06-01 2,012,p12,2018-06-01 1,012,p12,2018-06-01 2,023,p23,2018-06-01 2,034,p34,2018-06-01 3,034,p34,2018-06-01
Now we begin by initializing Spark context
//initialize the spark session val spark = SparkSession .builder() .master("local") .appName("File_Streaming") .getOrCreate()
In order to stream data from CSV file, we need to define a schema for the data. Spark will not allow streaming of CSV data, unless the schema is defined.
val schema = StructType( Array(StructField("customer_id", StringType), StructField("pid", StringType), StructField("pname", StringType), StructField("date", StringType))) //stream the orders from the csv files. val ordersStreamDF = spark .readStream .option("header", "true") .schema(schema) .csv(args(0))
Read the customer information from the static file and store it in a static dataset
case class Customer(customer_id : String, customer_name: String, customer_location: String) import spark.implicits._ //get the static customer data val customerDS = spark.read .format("csv") .option("header", true) .load("src/main/resources/input/cutomer_info/customer.csv") .map(x => Customer(x.getString(0), x.getString(1), x.getString(2)))
Join the streaming dataframe
ordersStreamDF
with datasetcustomerDS
on the customer_id field.val finalResult = ordersStreamDF.join(customerDS, "customer_id")
The resultant dataframe is now a streaming dataframe containing the resultant aggregation.
Using the above stremaing dataframe we can write data to any source supported by Spark
//write the joined stream to json/parquet output. val query = finalResult .writeStream .queryName("count_customer") //.format("console") .outputMode("append") .format("json") .partitionBy("date") .option("path", "src/main/resources/output/") .option("checkpointLocation", "src/main/resources/chkpoint_dir") .start()
Here,
- format(“console”) : can be used for debugging purpose by printing the values on console.
- outputMode(“append”).format(“json”) : Write the output in append mode to JSON files.
- partitionBy(“date”) : The output is partitioned date-wise. Partitioning data is one of the good strategies to adopt for performance improvement.
- option(“path”, “src/main/resources/output/”) : Specify output path to dump the data as JSON files. The partitioned directories are created based on the
date
field of data. - option(“checkpointLocation”, “src/main/resources/chkpoint_dir”) : Specify a path for checkpoint directory for fault tolerance.
The full code can be found in my Github Repo
Let’s get Streaming Started !
- Run the project as Scala project in IDE.
Once the streaming job starts, you will see the
chkpoint_dir
andoutput
directories created.Pavans-MacBook-Pro:resources pavanpkulkarni$ ls -ltr total 0 drwxr-xr-x 4 pavanpkulkarni staff 128 Jul 2 12:08 input drwxr-xr-x 8 pavanpkulkarni staff 256 Jul 6 16:08 output drwxr-xr-x 7 pavanpkulkarni staff 224 Jul 6 16:08 chkpoint_dir
The
output
directory will now contain partitioned subdirectories based off of thedate
field in schema.Pavans-MacBook-Pro:resources pavanpkulkarni$ ls -ltr output/ total 0 drwxr-xr-x 4 pavanpkulkarni staff 128 Jul 6 16:08 date=2018-06-05 drwxr-xr-x 4 pavanpkulkarni staff 128 Jul 6 16:08 date=2018-06-04 drwxr-xr-x 4 pavanpkulkarni staff 128 Jul 6 16:08 date=2018-06-03 drwxr-xr-x 4 pavanpkulkarni staff 128 Jul 6 16:08 date=2018-06-02 drwxr-xr-x 4 pavanpkulkarni staff 128 Jul 6 16:08 date=2018-06-01 drwxr-xr-x 3 pavanpkulkarni staff 96 Jul 6 16:08 _spark_metadata
N.B: It is important for us to maintain /path/to/input/key=value so that we achieve desired partitioning. Here key is the field name from schema (date in our demo) and value will be the values of that column.
Let’s look at the data for partition
date=2018-06-01
.Pavans-MacBook-Pro:resources pavanpkulkarni$ ls -ltra output/date\=2018-06-01/ total 16 -rw-r--r-- 1 pavanpkulkarni staff 16 Jul 6 16:07 .part-00000-56b3e0bd-705f-4b15-8a9f-61d6954dd1f2.c000.json.crc -rw-r--r-- 1 pavanpkulkarni staff 567 Jul 6 16:07 part-00000-56b3e0bd-705f-4b15-8a9f-61d6954dd1f2.c000.json drwxr-xr-x 8 pavanpkulkarni staff 256 Jul 6 16:08 .. drwxr-xr-x 4 pavanpkulkarni staff 128 Jul 6 16:08 . Pavans-MacBook-Pro:resources pavanpkulkarni$ cat output/date\=2018-06-01/part-00000-56b3e0bd-705f-4b15-8a9f-61d6954dd1f2.c000.json {"customer_id":"1","pid":"011","pname":"p11","customer_name":"kash","customer_location":"VT"} {"customer_id":"2","pid":"012","pname":"p12","customer_name":"pavan","customer_location":"IL"} {"customer_id":"1","pid":"012","pname":"p12","customer_name":"kash","customer_location":"VT"} {"customer_id":"2","pid":"023","pname":"p23","customer_name":"pavan","customer_location":"IL"} {"customer_id":"2","pid":"034","pname":"p34","customer_name":"pavan","customer_location":"IL"} {"customer_id":"3","pid":"034","pname":"p34","customer_name":"john","customer_location":"CA"}
As seen here, the JSON file is aggregated value of both the static customer information and the orders information.
Bonus advantages of this application
The application does not stop here. It keeps getting awesome !!
Imagine a situation where you get additional file to process. We need to make sure that this file to avoid data loss. Let’s go ahead add new data file in the input/orders/date\=2018-06-01
directory.
Pavans-MacBook-Pro:resources pavanpkulkarni$ ls -ltr input/orders/date\=2018-06-01/
total 16
-rw-r--r-- 1 pavanpkulkarni staff 144 Jun 12 11:29 data.csv
-rw-r--r-- 1 pavanpkulkarni staff 79 Jul 6 16:32 data_new.csv
Pavans-MacBook-Pro:resources pavanpkulkarni$ cat input/orders/date\=2018-06-01/data_new.csv
id,pid,pname,date
2,012,p34,2018-06-01
3,003,p3,2018-06-01
4,004,p4,2018-06-01
As soon as the new file is detected by the Spark engine, the streaming job is initiated and we can see the JSON file almost immediately. The most awesome part is that, a new JSON file will be created in the same partition.
Pavans-MacBook-Pro:resources pavanpkulkarni$ ls -ltra output/date\=2018-06-01/
total 32
-rw-r--r-- 1 pavanpkulkarni staff 16 Jul 6 16:07 .part-00000-56b3e0bd-705f-4b15-8a9f-61d6954dd1f2.c000.json.crc
-rw-r--r-- 1 pavanpkulkarni staff 567 Jul 6 16:07 part-00000-56b3e0bd-705f-4b15-8a9f-61d6954dd1f2.c000.json
-rw-r--r-- 1 pavanpkulkarni staff 12 Jul 6 16:32 .part-00000-70a945d1-2397-416b-8b80-e96e7c31758d.c000.json.crc
-rw-r--r-- 1 pavanpkulkarni staff 281 Jul 6 16:32 part-00000-70a945d1-2397-416b-8b80-e96e7c31758d.c000.json
drwxr-xr-x 8 pavanpkulkarni staff 256 Jul 6 16:32 ..
drwxr-xr-x 6 pavanpkulkarni staff 192 Jul 6 16:32 .
Checkpoint directory maintains the state of the engine and processes the new files from there on. By doing so, we can avoid re-running if the job for every new file or late file arrival. Another advantage is that the Spark engine will stay idle until the data arrives. Thus saving us the computation power which is a great way for cost reduction.
All the files can be viewed in my GitHub Repo
Share this post
Twitter
Google+
Facebook
LinkedIn
Email