spark, scala

Pivoting data with Spark

One of the common data engineering tasks is taking a deep dataset and turning into a wide dataset with some sort of aggregation function. Let's take a quick look at an example dataset to see why we would want to perform this action.

Our goal: To determine if there are any trends in the average Miles Per Gallon for new cars in the last few years.

The dataset can be downloaded from the fueleconomy.gov website.

The first step is to simply load the dataset. We'll go ahead and infer the schema at this point, so there will be no need to manually declare the data types.

scala> val df = spark.read.option("header","true").option("inferschema","true").format("csv").load("vehicles.csv.gz")
df: org.apache.spark.sql.DataFrame = [barrels08: double, barrelsA08: double ... 81 more fields]

scala> df.columns.mkString(", ")
res65: String = barrels08, barrelsA08, charge120, charge240, city08, city08U, cityA08, cityA08U, cityCD, cityE, cityUF, co2, co2A, co2TailpipeAGpm, co2TailpipeGpm, comb08, comb08U, combA08, combA08U, combE, combinedCD, combinedUF, cylinders, displ, drive, engId, eng_dscr, feScore, fuelCost08, fuelCostA08, fuelType, fuelType1, ghgScore, ghgScoreA, highway08, highway08U, highwayA08, highwayA08U, highwayCD, highwayE, highwayUF, hlv, hpv, id, lv2, lv4, make, model, mpgData, phevBlended, pv2, pv4, range, rangeCity, rangeCityA, rangeHwy, rangeHwyA, trany, UCity, UCityA, UHighway, UHighwayA, VClass, year, youSaveSpend, guzzler, trans_dscr, tCharger, sCharger, atvType, fuelType2, rangeA, evMotor, mfrCode, c240Dscr, charge240b, c240bDscr, createdOn, modifiedOn, startStop, phevCity, phevHwy, phevComb

You'll notice that there's a lot of columns in this dataset that we don't need, as well as data dating back to the 80s. So we'll select just the few columns that matter, as well as the records for the past few years. We'll also clean up the column names a little bit, too.

val carDF = df.select("year","comb08","VClass").filter($"VClass".contains("Cars")).filter($"year" === "2015" || $"year" === "2016" || $"year" === "2017" || $"year" === "2018").withColumnRenamed("comb08","mpg").withColumnRenamed("VClass","class")

scala> carDF.show(20,false)
+----+---+----------------+
|year|mpg|class           |
+----+---+----------------+
|2015|19 |Subcompact Cars |
|2015|23 |Compact Cars    |
|2015|30 |Midsize Cars    |
|2015|32 |Midsize Cars    |
|2015|29 |Midsize Cars    |
|2015|21 |Large Cars      |
|2015|18 |Large Cars      |
|2015|26 |Subcompact Cars |
|2015|24 |Compact Cars    |
|2015|19 |Compact Cars    |
|2015|22 |Compact Cars    |
|2015|22 |Midsize Cars    |
|2015|19 |Minicompact Cars|
|2015|18 |Minicompact Cars|
|2015|18 |Minicompact Cars|
|2015|17 |Minicompact Cars|
|2015|27 |Subcompact Cars |
|2015|21 |Compact Cars    |
|2015|18 |Midsize Cars    |
|2015|18 |Large Cars      |
+----+---+----------------+
only showing top 20 rows

Let's now take a look at the data we're going to pivot, using the correct group by and aggregation. I'm also rounding the mpg to 2 decimal places, as 8 is probably a few too many!

scala> carDF.groupBy("class","year").agg(round(avg("mpg"),2) as "mpg").sort("class","year").show(20,false)
+----------------+----+-----+
|class           |year|mpg  |
+----------------+----+-----+
|Compact Cars    |2015|27.16|
|Compact Cars    |2016|27.25|
|Compact Cars    |2017|27.89|
|Compact Cars    |2018|26.81|
|Large Cars      |2015|26.11|
|Large Cars      |2016|29.9 |
|Large Cars      |2017|27.66|
|Large Cars      |2018|22.31|
|Midsize Cars    |2015|26.43|
|Midsize Cars    |2016|27.43|
|Midsize Cars    |2017|28.61|
|Midsize Cars    |2018|26.03|
|Minicompact Cars|2015|24.79|
|Minicompact Cars|2016|24.35|
|Minicompact Cars|2017|25.11|
|Minicompact Cars|2018|22.8 |
|Subcompact Cars |2015|25.59|
|Subcompact Cars |2016|26.64|
|Subcompact Cars |2017|25.2 |
|Subcompact Cars |2018|24.32|
+----------------+----+-----+

The group by looks perfect, so next up is to perform the actual pivot, using the year as our pivot column. This will give us a new dataframe with the year as our columns.

scala> carDF.groupBy("class").pivot("year").agg(round(avg("mpg"),2)).sort("class").show(20,false)
+----------------+-----+-----+-----+-----+
|class           |2015 |2016 |2017 |2018 |
+----------------+-----+-----+-----+-----+
|Compact Cars    |27.16|27.25|27.89|26.81|
|Large Cars      |26.11|29.9 |27.66|22.31|
|Midsize Cars    |26.43|27.43|28.61|26.03|
|Minicompact Cars|24.79|24.35|25.11|22.8 |
|Subcompact Cars |25.59|26.64|25.2 |24.32|
+----------------+-----+-----+-----+-----+

After reviewing our result set, it would appear that the Miles Per Gallon have dropped for every car class in our dataset in 2018! So our next step would be to formulate a hypothesis for more thorough testing, based on that simple observation.

Author image

About James Conner

Scuba dive master, wildlife photographer, anthropologist, programmer, electronics tinkerer and big data expert.