Spark - MongoDB Data Processing (Scala)
Processing data from Mongo on distributed environment - Apache Spark
We will look into basic details of how to process data from MongoDB using Apache Spark.
The MongoDB Connector for Spark provides integration between MongoDB and Apache Spark.
With the connector, you have access to all Spark libraries for use with MongoDB datasets: Datasets for analysis with SQL (benefiting from automatic schema inference), streaming, machine learning, and graph APIs. You can also use the connector with the Spark Shell.
Why use Apache Spark ???
Rich Operators and Algorithms : Spark supports many operators that are not natively available in mongo shell. Along with operators, Spark also brings in optimized algorithms to process huge data.
Distributed Platform : Although Mongo provides distributed environment for storing documents, processing large amount of data becomes complex. Introducing Apache Spark at this point, will ease the computation. Also, mongo spark drivers are compatible with most of the commonly used programming languages viz. Scala, Java, Python and R.
Together MongoDB and Apache Spark are enabling success by turning analytics into real-time action.
Let’s look at some examples on basic read and write operations to and from MongoDB in Scala. Full code is available on my GitHub Repo
Set up Mongo-Spark Drivers
I will be using the latest version of mongo-spark-connector
We have already seen How to run spark in eclipse using Gradle.
Add the mongo-spark-connector
driver to build.gradle
as show below. You can check for the latest version here.
dependencies{
provided 'org.apache.spark:spark-core_2.11:2.2.1'
provided 'org.apache.spark:spark-sql_2.11:2.2.1'
provided 'org.apache.spark:spark-catalyst_2.11:2.2.1'
compile group: 'org.mongodb.spark', name: 'mongo-spark-connector_2.11', version: '2.2.2'
}
I’m using Scala 2.11 and Spark 2.2.1 as these are the latest versions available as of the time when this post was published.
Initialize SparkSession
Starting Apache Spark 2.0.0 the entry point to Spark Job is changed from SparkContext to SparkSession. SparkSession gives combined customization options of SparkConext and SparkSQL.
val spark = SparkSession
.builder()
.master("local")
.appName("Spark_Mongo")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/super_hero_db.students")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/super_hero_db.students")
.getOrCreate()
Here I have
- Initialized the Spark Job entry point
- Set the Spark Master
- Initialized connection to MongoDB
- Set a name for my Spark job.
Read From MongoDB
Let’s use the spark
SparkSession and read values from Mongo. I have 1 collection - students
in MongoDB and sample data can be seen in this post.
val studentsDF = MongoSpark.load(spark)
studentsDF.show(false)
This returns a Dataframe with Students data.
studentsDF.show(false)
+--------------------------+-------------------------------------------------------------+----+----------------+--------------+
|_id |courses_registered |id |name |year_graduated|
+--------------------------+-------------------------------------------------------------+----+----------------+--------------+
|[5afc9b45ef01bb656bfe5fd3]|[[CS001,Spring_2001], [CS002,Summer_2001], [CS001,Fall_2001]]|1.0 |Tom Riddle |2001 |
|[5afc9b45ef01bb656bfe5fd4]|[[CS003,Spring_2002], [CS004,Summer_2002], [CS005,Fall_2002]]|3.0 |Haan Solo |2002 |
|[5afc9b45ef01bb656bfe5fd5]|[[CS004,Spring_2004], [CS005,Summer_2004], [CS003,Fall_2004]]|5.0 |Sheldon Cooper |2004 |
|[5afc9b45ef01bb656bfe5fd6]|[[CS009,Spring_2005], [CS006,Summer_2005], [CS004,Fall_2005]]|6.0 |Iron Man |2005 |
|[5afc9b45ef01bb656bfe5fd7]|[[CS004,Spring_2006], [CS005,Summer_2006], [CS003,Fall_2006]]|7.0 |Stephan Hawkings|2006 |
|[5afc9b45ef01bb656bfe5fd8]|[[CS001,Spring_2007], [CS003,Summer_2007], [CS009,Fall_2007]]|8.0 |Cerci Lannister |2007 |
|[5afc9b45ef01bb656bfe5fd9]|[[CS006,Spring_2008], [CS007,Summer_2008], [CS009,Fall_2008]]|9.0 |Wonder Woman |2008 |
|[5afc9b45ef01bb656bfe5fda]|[[CS009,Spring_2003], [CS010,Summer_2003], [CS004,Fall_2003]]|4.0 |Frodo Baggins |2003 |
|[5afc9b45ef01bb656bfe5fdb]|[[CS001,Spring_2010], [CS002,Summer_2010], [CS005,Fall_2010]]|11.0|Peter Parker |2010 |
|[5afc9b45ef01bb656bfe5fdc]|[[CS010,Spring_2009], [CS002,Summer_2009], [CS007,Fall_2009]]|10.0|Hermione Granger|2009 |
|[5afc9b45ef01bb656bfe5fdd]|[[CS003,Spring_2002], [CS004,Summer_2002], [CS005,Fall_2002]]|2.0 |Ned Stark |2002 |
+--------------------------+-------------------------------------------------------------+----+----------------+--------------+
studentsDF.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- courses_registered: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- CID: string (nullable = true)
| | |-- cid: string (nullable = true)
| | |-- sem: string (nullable = true)
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- year_graduated: string (nullable = true)
N.B : _id
field is different from id
field. _id
is automatically generated by MongoDB when a new data is inserted.
Write Data to Mongo.
Writing to MongoDB using the mongo-spark connector is very simple.
I have used the SaveMode.Overwrite
to append data to the table and avoid re-writing to table.
Here, I will be creating a DataFrame and use this dataframe to insert data in Mongo.
Steps to follow are:
Create case classes for the required UDT.
case class students_cc(id : Int, year_graduated : String, courses_registered : List[cid_sem], name : String) case class cid_sem(cid : String, sem : String)
Create DataFrame based of off the case class
val listOfCoursesSem = List(cid_sem("CS003", "Spring_2011"), cid_sem("CS006", "Summer_2011"), cid_sem("CS009", "Fall_2011") ) val newStudents = Seq(students_cc(12, "2011", listOfCoursesSem, "Black Panther")).toDF()
N.B: To use
toDF()
, we need to make use of our old friendimport spark.implicits._
.write the DataFrame to MongoDB.
MongoSpark.save(newStudents.write.mode(SaveMode.Overwrite))
Verify if insert is successful
val studentsData = MongoSpark.load(spark) studentsData.show(false) +--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+ |_id |courses_registered |id |name |year_graduated| +--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+ |[5afcc577aebca2bc98a7135e]|[[null,CS003,Spring_2011], [null,CS006,Summer_2011], [null,CS009,Fall_2011]]|12 |Black Panther |2011 | |[5afcc674ef01bb656bfe6066]|[[CS003,null,Spring_2002], [CS004,null,Summer_2002], [CS005,null,Fall_2002]]|3 |Haan Solo |2002 | |[5afcc674ef01bb656bfe6067]|[[CS003,null,Spring_2002], [CS004,null,Summer_2002], [CS005,null,Fall_2002]]|2 |Ned Stark |2002 | |[5afcc674ef01bb656bfe6068]|[[CS001,null,Spring_2001], [CS002,null,Summer_2001], [CS001,null,Fall_2001]]|1 |Tom Riddle |2001 | |[5afcc674ef01bb656bfe6069]|[[CS004,null,Spring_2004], [CS005,null,Summer_2004], [CS003,null,Fall_2004]]|5 |Sheldon Cooper |2004 | |[5afcc674ef01bb656bfe606a]|[[CS004,null,Spring_2006], [CS005,null,Summer_2006], [CS003,null,Fall_2006]]|7 |Stephan Hawkings|2006 | |[5afcc674ef01bb656bfe606b]|[[CS009,null,Spring_2003], [CS010,null,Summer_2003], [CS004,null,Fall_2003]]|4 |Frodo Baggins |2003 | |[5afcc674ef01bb656bfe606c]|[[CS009,null,Spring_2005], [CS006,null,Summer_2005], [CS004,null,Fall_2005]]|6 |Tony Stark |2005 | |[5afcc674ef01bb656bfe606d]|[[CS001,null,Spring_2007], [CS003,null,Summer_2007], [CS009,null,Fall_2007]]|8 |Cerci Lannister |2007 | |[5afcc674ef01bb656bfe606e]|[[CS006,null,Spring_2008], [CS007,null,Summer_2008], [CS009,null,Fall_2008]]|9 |Wonder Woman |2008 | |[5afcc674ef01bb656bfe606f]|[[CS010,null,Spring_2009], [CS002,null,Summer_2009], [CS007,null,Fall_2009]]|10 |Hermione Granger|2009 | |[5afcc674ef01bb656bfe6070]|[[CS001,null,Spring_2010], [CS002,null,Summer_2010], [CS005,null,Fall_2010]]|11 |Peter Parker |2010 | +--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+
There… !! Black Panther is now a part of students
collection
More Options
We can use additional optimization options to process larger data in a more efficient way. Here is a complete documentation.
Run with spark-submit
You can run the same using spark-submit
First, let’s go ahead and build the jar.
Pavans-MacBook-Pro:Spark_Mongo_Example pavanpkulkarni$ gradle clean build
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :compileScala
Pruning sources from previous analysis, due to incompatible CompileSetup.
BUILD SUCCESSFUL in 25s
3 actionable tasks: 3 executed
We now have build/libs/Spark_Mongo_Example-1.0.jar
ready for deployment.
Run the spark-submit
command as below.
Pavans-MacBook-Pro:Spark_Mongo_Example pavanpkulkarni$ spark-submit --master local[4] --verbose --class com.pavanpkulkarni.mongo.SparkScalaMongo build/libs/Spark_Mongo_Example-1.0.jar
Using properties file: null
Parsed arguments:
master local[4]
deployMode null
executorMemory null
executorCores null
totalExecutorCores null
propertiesFile null
driverMemory null
driverCores null
driverExtraClassPath null
driverExtraLibraryPath null
driverExtraJavaOptions null
supervise false
queue null
numExecutors null
files null
pyFiles null
archives null
mainClass com.pavanpkulkarni.mongo.SparkScalaMongo
primaryResource file:/Users/pavanpkulkarni/Documents/workspace/Spark_Mongo_Example/build/libs/Spark_Mongo_Example-1.0.jar
name com.pavanpkulkarni.mongo.SparkScalaMongo
childArgs []
jars null
packages null
packagesExclusions null
repositories null
verbose true
Spark properties used, including those specified through
--conf and those from the properties file null:
Main class:
com.pavanpkulkarni.mongo.SparkScalaMongo
Arguments:
System properties:
(SPARK_SUBMIT,true)
(spark.app.name,com.pavanpkulkarni.mongo.SparkScalaMongo)
(spark.jars,*********(redacted))
(spark.submit.deployMode,client)
(spark.master,local[4])
Classpath elements:
file:/Users/pavanpkulkarni/Documents/workspace/Spark_Mongo_Example/build/libs/Spark_Mongo_Example-1.0.jar
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/05/16 21:37:55 INFO SparkContext: Running Spark version 2.2.1
18/05/16 21:37:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/05/16 21:37:56 INFO SparkContext: Submitted application: Spark_Mongo
18/05/16 21:37:56 INFO SecurityManager: Changing view acls to: pavanpkulkarni
18/05/16 21:37:56 INFO SecurityManager: Changing modify acls to: pavanpkulkarni
18/05/16 21:37:56 INFO SecurityManager: Changing view acls groups to:
18/05/16 21:37:56 INFO SecurityManager: Changing modify acls groups to:
18/05/16 21:37:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pavanpkulkarni); groups with view permissions: Set(); users with modify permissions: Set(pavanpkulkarni); groups with modify permissions: Set()
18/05/16 21:37:56 INFO Utils: Successfully started service 'sparkDriver' on port 51168.
18/05/16 21:37:56 INFO SparkEnv: Registering MapOutputTracker
.
.
.
.
.
18/05/16 21:38:03 INFO DAGScheduler: ResultStage 3 (show at SparkScalaMongo.scala:47) finished in 0.029 s
18/05/16 21:38:03 INFO DAGScheduler: Job 3 finished: show at SparkScalaMongo.scala:47, took 0.042215 s
+--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+
|_id |courses_registered |id |name |year_graduated|
+--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+
|[5afcc577aebca2bc98a7135e]|[[null,CS003,Spring_2011], [null,CS006,Summer_2011], [null,CS009,Fall_2011]]|12 |Black Panther |2011 |
|[5afcc674ef01bb656bfe6066]|[[CS003,null,Spring_2002], [CS004,null,Summer_2002], [CS005,null,Fall_2002]]|3 |Haan Solo |2002 |
|[5afcc674ef01bb656bfe6067]|[[CS003,null,Spring_2002], [CS004,null,Summer_2002], [CS005,null,Fall_2002]]|2 |Ned Stark |2002 |
|[5afcc674ef01bb656bfe6068]|[[CS001,null,Spring_2001], [CS002,null,Summer_2001], [CS001,null,Fall_2001]]|1 |Tom Riddle |2001 |
|[5afcc674ef01bb656bfe6069]|[[CS004,null,Spring_2004], [CS005,null,Summer_2004], [CS003,null,Fall_2004]]|5 |Sheldon Cooper |2004 |
|[5afcc674ef01bb656bfe606a]|[[CS004,null,Spring_2006], [CS005,null,Summer_2006], [CS003,null,Fall_2006]]|7 |Stephan Hawkings|2006 |
|[5afcc674ef01bb656bfe606b]|[[CS009,null,Spring_2003], [CS010,null,Summer_2003], [CS004,null,Fall_2003]]|4 |Frodo Baggins |2003 |
|[5afcc674ef01bb656bfe606c]|[[CS009,null,Spring_2005], [CS006,null,Summer_2005], [CS004,null,Fall_2005]]|6 |Tony Stark |2005 |
|[5afcc674ef01bb656bfe606d]|[[CS001,null,Spring_2007], [CS003,null,Summer_2007], [CS009,null,Fall_2007]]|8 |Cerci Lannister |2007 |
|[5afcc674ef01bb656bfe606e]|[[CS006,null,Spring_2008], [CS007,null,Summer_2008], [CS009,null,Fall_2008]]|9 |Wonder Woman |2008 |
|[5afcc674ef01bb656bfe606f]|[[CS010,null,Spring_2009], [CS002,null,Summer_2009], [CS007,null,Fall_2009]]|10 |Hermione Granger|2009 |
|[5afcc674ef01bb656bfe6070]|[[CS001,null,Spring_2010], [CS002,null,Summer_2010], [CS005,null,Fall_2010]]|11 |Peter Parker |2010 |
+--------------------------+----------------------------------------------------------------------------+---+----------------+--------------+
18/05/16 21:38:03 INFO ContextCleaner: Cleaned accumulator 99
18/05/16 21:38:03 INFO SparkContext: Invoking stop() from shutdown hook
18/05/16 21:38:03 INFO BlockManagerInfo: Removed broadcast_6_piece0 on 10.0.0.67:51169 in memory (size: 411.0 B, free: 366.3 MB)
18/05/16 21:38:03 INFO MongoClientCache: Closing MongoClient: [127.0.0.1:27017]
18/05/16 21:38:03 INFO connection: Closed connection [connectionId{localValue:2, serverValue:57}] to 127.0.0.1:27017 because the pool has been closed.
18/05/16 21:38:03 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 10.0.0.67:51169 in memory (size: 6.0 KB, free: 366.3 MB)
18/05/16 21:38:03 INFO BlockManagerInfo: Removed broadcast_4_piece0 on 10.0.0.67:51169 in memory (size: 411.0 B, free: 366.3 MB)
18/05/16 21:38:03 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 10.0.0.67:51169 in memory (size: 6.0 KB, free: 366.3 MB)
18/05/16 21:38:03 INFO BlockManagerInfo: Removed broadcast_5_piece0 on 10.0.0.67:51169 in memory (size: 2.7 KB, free: 366.3 MB)
18/05/16 21:38:03 INFO ContextCleaner: Cleaned accumulator 98
18/05/16 21:38:03 INFO SparkUI: Stopped Spark web UI at http://10.0.0.67:4040
18/05/16 21:38:03 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/05/16 21:38:03 INFO MemoryStore: MemoryStore cleared
18/05/16 21:38:03 INFO BlockManager: BlockManager stopped
18/05/16 21:38:03 INFO BlockManagerMaster: BlockManagerMaster stopped
18/05/16 21:38:03 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/05/16 21:38:03 INFO SparkContext: Successfully stopped SparkContext
18/05/16 21:38:03 INFO ShutdownHookManager: Shutdown hook called
18/05/16 21:38:03 INFO ShutdownHookManager: Deleting directory /private/var/folders/nb/ygmwx13x6y1_9pyzg1_82w440000gn/T/spark-7172f78f-0852-43f6-887d-a2dce687af3f
Being an avid Eclipse fan, I was experimenting with IntelliJ for this project. I seem to like IntelliJ over PyCharm for its ease of importing gradle dependencies and ease of setting up projects. Having said that, I will continue using IntelliJ until such time where I feel .. “nah.. Eclipse could’ve done this better !!!”
Share this post
Twitter
Google+
Facebook
LinkedIn
Email