spark, scala

Joining Data Frames in Spark SQL

SQL Joins

Data

The data that I'm using for this test comes from Kaggle's Titanic Project. The purpose of the Titanic project is to create a machine learning model to predict the survivability of the Titanic passengers. In this post, I'll just be using the data as samples for the purpose of illustrating joins in Spark.


Spark CSV Module

Since the data is in CSV format, there are a couple ways to deal with the data. The first method is to simply import the data using the textFile, and then use map a split using the comma as a delimiter. The problem with using this method is the data in the Name field of test.csv is formatted "<lastname>, <salutation> <firstname> <middlename> (<alias>)", with double-quotes interspersed randomly. It would take a while to correctly parse everything correctly, which isn't the exercise for this post.

Since we're just looking to parse the datasets quickly for the purpose of the join example, let's use the spark-csv module to just perform a simple import without fuss.

mkdir /opt/spark/spark-modules && cd /opt/spark/spark-modules
git clone https://github.com/databricks/spark-csv.git
cd /opt/spark/spark-modules/spark-csv && sbt/sbt package

Starting Spark Shell

After the spark-csv module is built, make sure to include the jars and packages when starting up the spark-shell.

/opt/spark/bin/spark-shell --master spark://<masterip>:7077 --jars /opt/spark-modules/spark-csv/target/scala-2.11/spark-csv_2.11-1.1.0.jar --packages com.databricks:spark-csv_2.11:1.1.0

Join the Data

1) Load Data

The spark-csv module is fairly simple to call, and we'll want to use the header option, since the CSV files all contain headers.
val testCSV = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/mnt/data/titanic/test.csv")

val genmodCSV = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/mnt/data/titanic/gendermodel.csv")


2) Fix Data Types

The downside to using the spark-csv module is that while it creates a Data Frame with a schema, it cannot auto detect the field data types. As a result, the generated Data Frame is comprised completely of string data types. In order to resolve this, we need to create new Data Frames containing cast data from the original Data Frames.

To start with, create a few user defined functions (UDFs) that take in strings and convert to the desired data type. Then it's a simple matter of creating a new Data Frame with the converted fields from the original.

import org.apache.spark.sql.functions._
val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toFloat  = udf[Float, String](_.toFloat)

val testDF = testCSV.withColumn("PassengerId", toInt(testCSV("PassengerId"))).withColumn("Pclass", toInt(testCSV("Pclass"))).withColumn("Name", testCSV("Name")).withColumn("Sex", testCSV("Sex")).withColumn("Age", toFloat(testCSV("Age"))).withColumn("SibSp", toInt(testCSV("SibSp"))).withColumn("Parch", testCSV("Parch")).withColumn("Ticket", testCSV("Ticket")).withColumn("Fare", toFloat(testCSV("Fare"))).withColumn("Cabin", testCSV("Cabin")).withColumn("Embarked", testCSV("Embarked"))

val genmodDF = genmodCSV.withColumn("PassengerId", toInt(genmodCSV("PassengerId"))).withColumn("Survived", toInt(genmodCSV("Survived")))

3) Naive Inner Join

At this point, we're ready to try a simple join, but this is where the immaturity of Spark SQL is highlighted. When performing a simple inner join of the `testDF` and `genmodDF` Data Frames, you'll notice that the "PassengerId" field appears twice; the join duplicates the field. If you try to perform a query calling the PassengerId field, an error is generated, claiming the field name is "ambiguous".
val joinedDF = testDF.join(genmodDF, testDF("PassengerId") === genmodDF("PassengerId"), "inner")

scala> joinedDF.printSchema
root
 |-- PassengerId: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)

joinedDF.select("PassengerId").show()
org.apache.spark.sql.AnalysisException: Reference 'PassengerId' is ambiguous

4) Explicit Inner Join

In order to reference the PassengerId field, we have to make it unique. Using Spark with DSL makes it fairly easy to distinguish the two PassengerId fields from each other. As a result, you can query both fields simultaneously.
val joinedDF = testDF.as('a).join(genmodDF.as('b), $"a.PassengerId" === $"b.PassengerId")

joinedDF.select($"a.PassengerId", $"b.PassengerId").take(10)

5) SQL Context Join

And finally, we can perform SQL statements using the Spark SQL Context. A couple of different ANSI SQL styles are supported, as can be seen below.
val resultset = sql("SELECT testDF.PassengerID, testDF.Name, genmodDF.Survived FROM testDF, genmodDF WHERE testDF.PassengerID = genmodDF.PassengerID").take(30)

or

val resultset = sql("SELECT testDF.PassengerID, testDF.Name, genmodDF.Survived FROM testDF LEFT OUTER JOIN genmodDF ON testDF.PassengerID = genmodDF.PassengerID").take(30)
Author image

About James Conner

Scuba dive master, wildlife photographer, anthropologist, programmer, electronics tinkerer and big data expert.
You've successfully subscribed to My Areas of Expertise
Great! Next, complete checkout for full access to My Areas of Expertise
Welcome back! You've successfully signed in.
Unable to sign you in. Please try again.
Success! Your account is fully activated, you now have access to all content.
Error! Stripe checkout failed.
Success! Your billing info is updated.
Error! Billing info update failed.