spark, scala, ML, Machine Learning

Using Spark, Scala and XGBoost On The Titanic Dataset from Kaggle

The Titanic: Machine Learning from Disaster competition on Kaggle is an excellent resource for anyone wanting to dive into Machine Learning. There are forums where you can request help and review solutions that were written in a variety of languages.

This particular Machine Learning (ML) challenge is easy to understand (predict who survived), and also requires many of the standard tasks typically found in data science such as imputation, feature engineering, hyperparameterization, cross validation and binary classification evaluation.

Since there are plenty of examples out on the interwebs for the Titanic problem using Python and R, I decided to use a combination of technologies that are more typical of productionized environments. Apache Spark for the processing engine, Scala for the programming language, and XGBoost for the classification algorithm. I wrote the code using Jupyter Notebooks and connected to the Spark cluster with Apache Toree.

So let's begin by importing the dependencies and loading the data files.

// Import dependencies
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.feature.{Imputer, ImputerModel}
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, IndexToString} 
import org.apache.spark.ml.feature.{VectorAssembler, VectorIndexer}
import org.apache.spark.ml.feature.{Bucketizer,Normalizer}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import ml.dmlc.xgboost4j.scala.spark.{XGBoostEstimator, XGBoostClassificationModel}
import scala.collection.mutable
// Setup the Scala syntax for Spark SQL
val sparkImplicits = spark
import sparkImplicits.implicits._

The data files can be downloaded from the Titanic site on Kaggle.

// Load the csv data files
val trainingDF = (spark.read
  .option("header","true")
  .option("inferSchema","true")
  .format("csv")
  .load("data/Titanic/train.csv"))
  
val testingDF = (spark.read
  .option("header","true")
  .option("inferSchema","true")
  .format("csv")
  .load("data/Titanic/test.csv"))

Onwards to feature engineering ... This is where things begin to get interesting. If you peruse the dataset, you'll find we can extract the person's honorific title from the Name column, which will probably be useful. It turns out there are a variety of titles, ranging from the standard Mister and Madam to titles of Nobility and Military rank. Would a noble or officer be more likely to survive or not?

We can also create some buckets for family sizes as well by adding up the number of siblings, parents and children associated with the passenger. That information is located in the SibSp, and Parch columns. Maybe that's important, too.

And finally, are Mothers and Children good predictors for survivability? Let's create those features and find out! This information can be gleaned by looking at the Parch, Age and Sex columns, in addition to the newly engineered Miss column.

// Feature Engineering Step:
// Surname Regex: Pull last name from the Name record
// Honorific Regex: Pull title from the Name column.  Create a column per title.
// Family Size: Add the SibSp and Parch columns, and a 1 for self
// Family Buckets: Singleton = 1, SmallFam = 2 to 4, LargeFam > 5
// Child feature: Is the person 18 or under
// Mother feature: 16 or older, female, Honorific other than "Miss", and Parch greater than 0

val trainFeatures = (trainingDF
    .withColumn("Surname", regexp_extract($"Name","([\\w ']+),",1))
    .withColumn("Honorific", regexp_extract($"Name","(.*?)([\\w]+?)[.]",2))
    .withColumn("Mil", when($"Honorific" === "Col" or 
        $"Honorific" === "Major" or 
        $"Honorific" === "Capt", 1).otherwise(0))
    .withColumn("Doc", when($"Honorific" === "Dr", 1).otherwise(0))
    .withColumn("Rev", when($"Honorific" === "Rev", 1).otherwise(0))
    .withColumn("Nob", when($"Honorific" === "Sir" or
        $"Honorific" === "Countess" or
        $"Honorific" === "Count" or
        $"Honorific" === "Duke" or
        $"Honorific" === "Duchess" or
        $"Honorific" === "Jonkheer" or
        $"Honorific" === "Don" or
        $"Honorific" === "Dona" or
        $"Honorific" === "Lord" or
        $"Honorific" === "Lady" or
        $"Honorific" === "Earl" or
        $"Honorific" === "Baron", 1).otherwise(0))
    .withColumn("Mr", when($"Honorific" === "Mr", 1).otherwise(0))
    .withColumn("Mrs", when($"Honorific" === "Mrs" or 
        $"Honorific" === "Mme", 1).otherwise(0))
    .withColumn("Miss", when($"Honorific" === "Miss" or 
        $"Honorific" === "Mlle", 1).otherwise(0))
    .withColumn("Mstr", when($"Honorific" === "Master", 1).otherwise(0))
    .withColumn("TotalFamSize",$"SibSp"+$"Parch"+1)
    .withColumn("Singleton", when($"TotalFamSize" === 1, 1).otherwise(0))
    .withColumn("SmallFam", when($"TotalFamSize" <= 4 && 
        $"TotalFamSize" > 1, 1).otherwise(0))
    .withColumn("LargeFam", when($"TotalFamSize" >= 5, 1).otherwise(0))
    .withColumn("Child", when($"Age" <= 18, 1).otherwise(0))
    .withColumn("Mother", when($"Age" > 15 && 
        $"Parch" > 0 && 
        $"Miss" === 0 && 
        $"Sex" === "female",1).otherwise(0)))

There's a bit of data missing in some of the columns. The Cabin column, for example, is full of gaps. It might be useful if we get a map of the ship and begin creating features using a combination of Cabin and Pclass and Fare. for the moment, let's just concentrate on the reasonable imputation of data. The Embarked column seems to be a good place to start.

// Perform discovery on missing data in Embarked column.
// Create the temp table view so we can perform spark.sql queries on the DF
trainFeatures.createOrReplaceTempView("trainFeatures")

// Explore the data
(trainFeatures
  .groupBy("Pclass","Embarked")
  .agg(count("*"),avg("Fare"),min("Fare"),max("Fare"),stddev("Fare"))
  .orderBy("Pclass","Embarked")
  .show())

-------------------------------------------------------------------------------------------------------
+------+--------+--------+------------------+---------+---------+------------------+
|Pclass|Embarked|count(1)|         avg(Fare)|min(Fare)|max(Fare)| stddev_samp(Fare)|
+------+--------+--------+------------------+---------+---------+------------------+
|     1|    null|       2|              80.0|     80.0|     80.0|               0.0|
|     1|       C|      85|104.71852941176469|    26.55| 512.3292|  99.0939349696501|
|     1|       Q|       2|              90.0|     90.0|     90.0|               0.0|
|     1|       S|     127| 70.36486220472443|      0.0|    263.0|58.811277761795566|
|     2|       C|      17|25.358335294117644|     12.0|  41.5792|11.345067090697457|
|     2|       Q|       3|             12.35|    12.35|    12.35|               0.0|
|     2|       S|     164|20.327439024390245|      0.0|     73.5|13.630741099088103|
|     3|       C|      66|11.214083333333337|   4.0125|  22.3583| 4.871528353625736|
|     3|       Q|      72|11.183393055555557|     6.75|   29.125| 6.721676511682005|
|     3|       S|     353| 14.64408300283288|      0.0|    69.55|13.276608721649458|
+------+--------+--------+------------------+---------+---------+------------------+

// The missing data appears to be for 1st class passengers, and the fare was 80
// Let's create a median view of the fares for 1st class passengers, using
// the percentile_approx method, and determine which Embark location is 
// closest to 80 in fare cost.
spark.sql("SELECT Pclass,Embarked,percentile_approx(Fare, 0.5) AS Median_Fare FROM trainFeatures WHERE Fare IS NOT NULL AND Pclass = 1 GROUP BY Pclass,Embarked").show()

-------------------------------------------------------------------------------------------------------
+------+--------+-----------+
|Pclass|Embarked|Median_Fare|
+------+--------+-----------+
|     1|    null|       80.0|
|     1|       Q|       90.0|
|     1|       C|    78.2667|
|     1|       S|       52.0|
+------+--------+-----------+

When we explore the Embarked column, you'll notice that there are two records with null values. Both are for first class passengers, and both paid 80 Pounds for their ticket.

So we then looked at the median fare paid by first class passengers for each embarkation port.

It seems reasonable to assume that the 2 missing values are 'C', since 78.2667 is closest to 80.0, so let's go ahead and impute those missing values.

// Impute Embarked column
// From the discovery above, the likely port is C, since the Median for C is closest to 80.
val trainEmbarked = trainFeatures.na.fill("C",Seq("Embarked"))

Now we're moving on to something a little more complex ... the missing data in the Age column. There's quite a bit of data missing in the Age column, but it seems likely that Age is going to be an excellent feature for predicting survival, so let's go ahead and do some discovery.

// Perform discovery on missing data in Age column
// Create the temp table view so we can perform spark.sql queries on the dataframe
trainEmbarked.createOrReplaceTempView("trainEmbarked")

// Explore the data
// Count nulls for each Honorific.  Some titles can imply age (miss,master,etc)
spark.sql("SELECT Honorific,count(*) as nullAge FROM trainEmbarked WHERE Age IS NULL GROUP BY Honorific").show()

-------------------------------------------------------------------------------------------------------
+---------+-------+
|Honorific|nullAge|
+---------+-------+
|     Miss|     36|
|   Master|      4|
|       Mr|    119|
|       Dr|      1|
|      Mrs|     17|
+---------+-------+

// Calculate the average age for the Honorific titles that have nulls
spark.sql("SELECT Honorific,round(avg(Age)) as avgAge FROM trainEmbarked WHERE Age IS NOT NULL AND Honorific IN ('Miss','Master','Mr','Dr','Mrs') GROUP BY Honorific").show()

-------------------------------------------------------------------------------------------------------
+---------+------+
|Honorific|avgAge|
+---------+------+
|     Miss|  22.0|
|   Master|   5.0|
|       Mr|  32.0|
|       Dr|  42.0|
|      Mrs|  36.0|
+---------+------+

Taking a look at the average Age, grouped by Honorofic gives us the values we're going to use for imputation. A cursory look seems reasonable ... if we were going to be a bit more rigorous, we'd look at the deviations and remove outliers, all that sort of thing, but let's keep this simple. Onwards to imputation! (I love that word ... "imputation")

What we're doing in the code below is creating a single column data frame for each Honorific we're imputing, along with a remainder data frame for all other honorifics. Afterwards, just union the data back together.

// Impute the missing Age values for the relevant Honorific columns and union the data back together
val trainMissDF = trainEmbarked.na.fill(22.0).where("Honorific = 'Miss'")
val trainMasterDF = trainEmbarked.na.fill(5.0).where("Honorific = 'Master'")
val trainMrDF = trainEmbarked.na.fill(32.0).where("Honorific = 'Mr'")
val trainDrDF = trainEmbarked.na.fill(42.0).where("Honorific = 'Dr'")
val trainMrsDF = trainEmbarked.na.fill(36.0).where("Honorific = 'Mrs'")
val trainRemainderDF = spark.sql("SELECT * FROM trainEmbarked WHERE Honorific NOT IN ('Miss','Master','Dr','Mr','Mrs')")
val trainCombinedDF = trainRemainderDF.union(trainMissDF).union(trainMasterDF).union(trainMrDF).union(trainDrDF).union(trainMrsDF)

Since we can't use StringType columns for creating our features vector later on, we need to convert the Sex and Embarked columns into numeric columns by indexing them.

// Convert the categorical (string) values into numeric values
val genderIndexer = new StringIndexer().setInputCol("Sex").setOutputCol("SexIndex")
val embarkIndexer = new StringIndexer().setInputCol("Embarked").setOutputCol("EmbarkIndex")

For grins and giggles, let's One Hot Encode these columns, too. This will give us binary values in our columns.

// Convert the numerical index columns into One Hot columns
// The One Hot columns are binary {0,1} values of the categories
val genderEncoder = new OneHotEncoder().setInputCol("SexIndex").setOutputCol("SexVec")
val embarkEncoder = new OneHotEncoder().setInputCol("EmbarkIndex").setOutputCol("EmbarkVec")

For the fares, we're going to just create simple buckets. We should probably do some more work to discover the optimum splits, but these seem to be a reasonable place to start.

// Create 8 buckets for the fares, turning a continuous feature into a discrete range
val fareSplits = Array(0.0,10.0,20.0,30.0,40.0,60.0,120.0,Double.PositiveInfinity)
val fareBucketize = new Bucketizer().setInputCol("Fare").setOutputCol("FareBucketed").setSplits(fareSplits)

Next up is the vector assembler. Inside the assembler is a list of all features we're going to use in our Machine Learning algorithm. Note that we haven't actually executed the assembler yet. That's coming up in the pipeline!

// Create a vector of the features.  
val assembler = new VectorAssembler().setInputCols(Array("Pclass", "SexVec", "Age", "SibSp", "Parch", "Fare", "FareBucketed", "EmbarkVec", "Mil", "Doc", "Rev", "Nob", "Mr", "Mrs", "Miss", "Mstr", "TotalFamSize", "Singleton", "SmallFam", "LargeFam", "Child", "Mother")).setOutputCol("features")

The pipeline is a where we string together the data prep tasks like the indexers, the encoders, and the vector assembler. Once the pipeline object is created, we have to fit and transform the data frame. The output of this is a new data frame which has the requisite features column, which is a VectorType.

// Create the features pipeline and data frame
// The order is important here, Indexers have to come before the encoders
val trainingFeaturesPipeline = (new Pipeline()
  .setStages(Array(genderIndexer,embarkIndexer,genderEncoder,embarkEncoder, fareBucketize, assembler)))

val trainingFeaturesDF = trainingFeaturesPipeline.fit(trainCombinedDF).transform(trainCombinedDF)

Standard procedure, let's create a training and testing data frame for the Machine Learning and evaluation processes.

// Now that the data has been prepared, let's split the dataset into a training and test dataframe
val Array(trainDF, testDF) = trainingFeaturesDF.randomSplit(Array(0.8, 0.2),seed = 12345)

Now we're getting into the algorithm ... This is the basic parameter map for XGBoost. We'll add some additional hyperparameters later, too.

// Create default param map for XGBoost
def get_param(): mutable.HashMap[String, Any] = {
    val params = new mutable.HashMap[String, Any]()
        params += "eta" -> 0.1
        params += "max_depth" -> 8
        params += "gamma" -> 0.0
        params += "colsample_bylevel" -> 1
        params += "objective" -> "binary:logistic"
        params += "num_class" -> 2
        params += "booster" -> "gbtree"
        params += "num_rounds" -> 20
        params += "nWorkers" -> 3
    return params
}

Create the XGBoost object, with the basic parameter map, and the Survived column as the label, and features for the features vector.

// Create an XGBoost Classifier
val xgb = new XGBoostEstimator(get_param().toMap).setLabelCol("Survived").setFeaturesCol("features")

And this is the hyperparameter grid for XGBoost. We're only going hyperparam the alpha, lambda and subSample parameters.

// XGBoost paramater grid
val xgbParamGrid = (new ParamGridBuilder()
  .addGrid(xgb.round, Array(1000))
  .addGrid(xgb.maxDepth, Array(16))
  .addGrid(xgb.maxBins, Array(2))
  .addGrid(xgb.minChildWeight, Array(0.2))
  .addGrid(xgb.alpha, Array(0.8, 0.9))
  .addGrid(xgb.lambda, Array(0.9, 1.0))
  .addGrid(xgb.subSample, Array(0.6, 0.65, 0.7))
  .addGrid(xgb.eta, Array(0.015))
  .build())

Let's create the XGBoost Pipeline object, which is going to be used in the cross validator.

// Create the XGBoost pipeline
val pipeline = new Pipeline().setStages(Array(xgb))

We'll also need a binary classifier evaluator for the cross validator. This is so the cross validator can determine which set of parameters produces the best model.

// Setup the binary classifier evaluator
val evaluator = (new BinaryClassificationEvaluator()
  .setLabelCol("Survived")
  .setRawPredictionCol("prediction")
  .setMetricName("areaUnderROC"))

Next we're creating the cross validation object! It's got the estimator pipeline (which was the XGBoost algorithm), the evaluator (the Binary Classifier), and the estimator Parameter Maps (the hyperparameter grid).

// Create the Cross Validation pipeline, using XGBoost as the estimator, the 
// Binary Classification evaluator, and xgbParamGrid for hyperparameters
val cv = (new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(xgbParamGrid)
  .setNumFolds(10))

And the moment we've all been waiting for ... creating the model by running the cross validation fit method on trainDF.

// Create the model by fitting the training data
val xgbModel = cv.fit(trainDF)

-------------------------------------------------------------------------------------------------------
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=192.168.250.150, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=1}
==== serialized obj size 2562986
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=192.168.250.150, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=1}
==== serialized obj size 2344538
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=192.168.250.150, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=1}
==== serialized obj size 2602010
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=192.168.250.150, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=1}
==== serialized obj size 2570690
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=192.168.250.150, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=1}
==== serialized obj size 2521802
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=192.168.250.150, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=1}

Once the best model has been found, let's see how it performs against the hold-out data frame testDF.

// Test the data by scoring the model
val results = xgbModel.transform(testDF)

But wait ... before showing you the results, let's take a look at all of the parameters that make up the best XGBoost model.

// Print out a copy of the parameters used by XGBoost
(xgbModel.bestModel.asInstanceOf[PipelineModel]
  .stages(0).asInstanceOf[XGBoostClassificationModel]
  .extractParamMap().toSeq.foreach(println))

-------------------------------------------------------------------------------------------------------
ParamPair(XGBoostClassificationModel_c522939b3d0f__max_bin,2)
ParamPair(XGBoostClassificationModel_c522939b3d0f__colsample_bytree,1.0)
ParamPair(XGBoostClassificationModel_c522939b3d0f__max_depth,16)
ParamPair(XGBoostClassificationModel_c522939b3d0f__colsample_bylevel,1.0)
ParamPair(XGBoostClassificationModel_c522939b3d0f__predictionCol,prediction)
ParamPair(XGBoostClassificationModel_c522939b3d0f__outputMargin,false)
ParamPair(XGBoostClassificationModel_c522939b3d0f__rate_drop,0.0)
ParamPair(XGBoostClassificationModel_c522939b3d0f__eta,0.015)
ParamPair(XGBoostClassificationModel_c522939b3d0f__sample_type,uniform)
ParamPair(XGBoostClassificationModel_c522939b3d0f__lambda_bias,0.0)
ParamPair(XGBoostClassificationModel_c522939b3d0f__rawPredictionCol,probabilities)
ParamPair(XGBoostClassificationModel_c522939b3d0f__labelCol,Survived)
ParamPair(XGBoostClassificationModel_c522939b3d0f__normalize_type,tree)
ParamPair(XGBoostClassificationModel_c522939b3d0f__subsample,0.65)
ParamPair(XGBoostClassificationModel_c522939b3d0f__booster,gbtree)
ParamPair(XGBoostClassificationModel_c522939b3d0f__alpha,0.9)
ParamPair(XGBoostClassificationModel_c522939b3d0f__use_external_memory,false)
ParamPair(XGBoostClassificationModel_c522939b3d0f__tree_method,auto)
ParamPair(XGBoostClassificationModel_c522939b3d0f__lambda,1.0)
ParamPair(XGBoostClassificationModel_c522939b3d0f__sketch_eps,0.03)
ParamPair(XGBoostClassificationModel_c522939b3d0f__scale_pos_weight,1.0)
ParamPair(XGBoostClassificationModel_c522939b3d0f__grow_policy,depthwise)
ParamPair(XGBoostClassificationModel_c522939b3d0f__max_delta_step,0.0)
ParamPair(XGBoostClassificationModel_c522939b3d0f__skip_drop,0.0)
ParamPair(XGBoostClassificationModel_c522939b3d0f__featuresCol,features)
ParamPair(XGBoostClassificationModel_c522939b3d0f__min_child_weight,0.2)
ParamPair(XGBoostClassificationModel_c522939b3d0f__gamma,0.0)

Now on to the evaluation ... first let's take a look at the error counts.

// Let's take a look at the Type 1 and Type 2 errors counts.
println(s"True Negative: ${results.select("*").where("prediction = 0 AND Survived = 0").count()}  True Positive: ${results.select("*").where("prediction = 1 AND Survived = 1").count()}")
println(s"False Negative: ${results.select("*").where("prediction = 0 AND Survived = 1").count()} False Positive: ${results.select("*").where("prediction = 1 AND Survived = 0").count()}")

-------------------------------------------------------------------------------------------------------
True Negative: 55  True Positive: 25
False Negative: 5 False Positive: 3

Not bad, but let's look at the records which were incorrectly scored. We can look through this data and attempt to discover any additional patterns or features that would help narrow this list down a bit more.

// Take a look at the records that were incorrectly predicted.
// Use these records to determine any additional patterns or
// insight to increase the accuracy of the model.
results.createOrReplaceTempView("results")
spark.sql("SELECT PassengerID as PID,Pclass,Sex,Age,SibSp,Parch,Honorific as Hon,TotalFamSize as Fam,Survived,prediction,probabilities FROM results where Survived != cast(prediction as int)").show(100)

+---+------+------+----+-----+-----+----+---+--------+----------+--------------------+
|PID|Pclass|   Sex| Age|SibSp|Parch| Hon|Fam|Survived|prediction|       probabilities|
+---+------+------+----+-----+-----+----+---+--------+----------+--------------------+
|730|     3|female|25.0|    1|    0|Miss|  2|       0|       1.0|[0.49238371849060...|
|138|     1|  male|37.0|    1|    0|  Mr|  2|       0|       1.0|[0.15266531705856...|
|188|     1|  male|45.0|    0|    0|  Mr|  1|       1|       0.0|[0.65776157379150...|
|570|     3|  male|32.0|    0|    0|  Mr|  1|       1|       0.0|[0.97534757852554...|
|623|     3|  male|20.0|    1|    1|  Mr|  3|       1|       0.0|[0.90683007240295...|
|763|     3|  male|20.0|    0|    0|  Mr|  1|       1|       0.0|[0.89590752124786...|
|840|     1|  male|32.0|    0|    0|  Mr|  1|       1|       0.0|[0.91530543565750...|
| 50|     3|female|18.0|    1|    0| Mrs|  2|       0|       1.0|[0.22403389215469...|
+---+------+------+----+-----+-----+----+---+--------+----------+--------------------+

And finally, let's take a look at the AUC. This code isn't perfect, but .89090 is an excellent start!

// What was the overall accuracy of the model, using AUC
evaluator.evaluate(results)

-------------------------------------------------------------------------------------------------------
0.8908045977011495
Author image

About James Conner

Scuba dive master, wildlife photographer, anthropologist, programmer, electronics tinkerer and big data expert.