spark, scala, ML, Machine Learning

Spark Vector of Vectors

I recently ran into a problem with creating a features vector for a machine learning project. If the number of features in your dataframe is too large, the JVM will crash during the Catalyst optimizer process because the number of constant variables generated exceeds the JVM limit of 65,536.

There is a JIRA entered for this problem: SPARK-18016. The JIRA indicates a patch has been submitted to the Spark 2.3.0 codebase, but I've downloaded and compiled the 2.3.0 snapshot and discovered that the problem still exsists.

So what can you do if you run into this problem? You can create a "Vector of Vectors". In otherwords, create vectors of a smaller number of features using the vector assembler, and then use the vector assembler again to create a new vector using the smaller vectors as your input columns. The output at the end of the process will still be one big vector with all of your feature data.

// IMPORT DEPENDENCIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler

// Create first example dataframe
val firstDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", "col6")

// Create second example dataframe
val secondDF = spark.createDataFrame(Seq(
  (1, 3, 2, 0, 4, 2, 8),
  (2, 3, 3, 2, 6, 5, 4),
  (3, 8, 5, 1, 2, 3, 5),
  (4, 9, 8, 2, 4, 9, 2),
  (5, 3, 4, 8, 0, 6, 2),
  (6, 3, 9, 8, 8, 9, 3)
)).toDF("uid", "col7", "col8", "col9", "colA", "colB", "colC")

// Create third example dataframe
val thirdDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "colD", "colE", "colF", "colG", "colH", "colI")

// Create fourth example dataframe
val fourthDF = spark.createDataFrame(Seq(
  (1, 3, 2, 0, 4, 2, 8),
  (2, 3, 3, 2, 6, 5, 4),
  (3, 8, 5, 1, 2, 3, 5),
  (4, 9, 8, 2, 4, 9, 2),
  (5, 3, 4, 8, 0, 6, 2),
  (6, 3, 9, 8, 8, 9, 3)
)).toDF("uid", "colJ", "colK", "colL", "colM", "colN", "colO")

// Display the schemas
firstDF.printSchema
secondDF.printSchema
thirdDF.printSchema
fourthDF.printSchema

// Display the dataframes
firstDF.show()
secondDF.show()
thirdDF.show()
fourthDF.show()

// CREATE ARRAYS OF COLUMN NAMES THAT WILL BE USED FOR EACH DF FEATURES VECTOR
val firstInCols = firstDF.columns.filter(_ != "uid")
val secondInCols = secondDF.columns.filter(_ != "uid")
val thirdInCols = thirdDF.columns.filter(_ != "uid")
val fourthInCols = fourthDF.columns.filter(_ != "uid")

// USE THE VECTOR ASSEMBLER TO CREATE THE PER DF VECTOR
val firstFeatureDF = new VectorAssembler().setInputCols(firstInCols).setOutputCol("first_features").transform(firstDF)
val secondFeatureDF = new VectorAssembler().setInputCols(secondInCols).setOutputCol("second_features").transform(secondDF)
val thirdFeatureDF = new VectorAssembler().setInputCols(thirdInCols).setOutputCol("third_features").transform(thirdDF)
val fourthFeatureDF = new VectorAssembler().setInputCols(fourthInCols).setOutputCol("fourth_features").transform(fourthDF)

// JOIN ID AND ALL FEATURES COLUMNS INTO A DF
val fullDF = firstFeatureDF.select("uid","first_features").join(secondFeatureDF.select("uid","second_features"), Seq("uid")).join(thirdFeatureDF.select("uid","third_features"), Seq("uid")).join(fourthFeatureDF.select("uid","fourth_features"), Seq("uid"))

// DISPLAY fullDF
fullDF.show()

// USE VECTOR ASSEMBLER TO JOIN ALL FEATURE COLUMNS
val vecInCols = fullDF.columns.filter(_ != "uid")
val vecDF = new VectorAssembler().setInputCols(vecInCols).setOutputCol("features").transform(fullDF)

// DISPLAY THE SCHEMA
vecDF.printSchema

// DISPLAY vecDF
vecDF.select("uid","features").show(false)



Output:

firstDF.printSchema
root
 |-- uid: integer (nullable = false)
 |-- col1: integer (nullable = false)
 |-- col2: integer (nullable = false)
 |-- col3: integer (nullable = false)
 |-- col4: integer (nullable = false)
 |-- col5: integer (nullable = false)
 |-- col6: integer (nullable = false)

secondDF.printSchema root |-- uid: integer (nullable = false) |-- col7: integer (nullable = false) |-- col8: integer (nullable = false) |-- col9: integer (nullable = false) |-- colA: integer (nullable = false) |-- colB: integer (nullable = false) |-- colC: integer (nullable = false)
thirdDF.printSchema root |-- uid: integer (nullable = false) |-- colD: integer (nullable = false) |-- colE: integer (nullable = false) |-- colF: integer (nullable = false) |-- colG: integer (nullable = false) |-- colH: integer (nullable = false) |-- colI: integer (nullable = false)
fourthDF.printSchema root |-- uid: integer (nullable = false) |-- colJ: integer (nullable = false) |-- colK: integer (nullable = false) |-- colL: integer (nullable = false) |-- colM: integer (nullable = false) |-- colN: integer (nullable = false) |-- colO: integer (nullable = false)
firstDF.show() +---+----+----+----+----+----+----+ |uid|col1|col2|col3|col4|col5|col6| +---+----+----+----+----+----+----+ | 1| 1| 2| 3| 8| 4| 5| | 2| 4| 3| 8| 7| 9| 8| | 3| 6| 1| 9| 2| 3| 6| | 4| 7| 8| 6| 9| 4| 5| | 5| 9| 2| 7| 8| 7| 3| | 6| 1| 1| 4| 2| 8| 4| +---+----+----+----+----+----+----+
secondDF.show() +---+----+----+----+----+----+----+ |uid|col7|col8|col9|colA|colB|colC| +---+----+----+----+----+----+----+ | 1| 3| 2| 0| 4| 2| 8| | 2| 3| 3| 2| 6| 5| 4| | 3| 8| 5| 1| 2| 3| 5| | 4| 9| 8| 2| 4| 9| 2| | 5| 3| 4| 8| 0| 6| 2| | 6| 3| 9| 8| 8| 9| 3| +---+----+----+----+----+----+----+
thirdDF.show() +---+----+----+----+----+----+----+ |uid|colD|colE|colF|colG|colH|colI| +---+----+----+----+----+----+----+ | 1| 1| 2| 3| 8| 4| 5| | 2| 4| 3| 8| 7| 9| 8| | 3| 6| 1| 9| 2| 3| 6| | 4| 7| 8| 6| 9| 4| 5| | 5| 9| 2| 7| 8| 7| 3| | 6| 1| 1| 4| 2| 8| 4| +---+----+----+----+----+----+----+
fourthDF.show() +---+----+----+----+----+----+----+ |uid|colJ|colK|colL|colM|colN|colO| +---+----+----+----+----+----+----+ | 1| 3| 2| 0| 4| 2| 8| | 2| 3| 3| 2| 6| 5| 4| | 3| 8| 5| 1| 2| 3| 5| | 4| 9| 8| 2| 4| 9| 2| | 5| 3| 4| 8| 0| 6| 2| | 6| 3| 9| 8| 8| 9| 3| +---+----+----+----+----+----+----+
fullDF.show() +---+--------------------+--------------------+--------------------+--------------------+ |uid| first_features| second_features| third_features| fourth_features| +---+--------------------+--------------------+--------------------+--------------------+ | 1|[1.0,1.0,2.0,3.0,...|[1.0,3.0,2.0,0.0,...|[1.0,1.0,2.0,3.0,...|[1.0,3.0,2.0,0.0,...| | 2|[2.0,4.0,3.0,8.0,...|[2.0,3.0,3.0,2.0,...|[2.0,4.0,3.0,8.0,...|[2.0,3.0,3.0,2.0,...| | 3|[3.0,6.0,1.0,9.0,...|[3.0,8.0,5.0,1.0,...|[3.0,6.0,1.0,9.0,...|[3.0,8.0,5.0,1.0,...| | 4|[4.0,7.0,8.0,6.0,...|[4.0,9.0,8.0,2.0,...|[4.0,7.0,8.0,6.0,...|[4.0,9.0,8.0,2.0,...| | 5|[5.0,9.0,2.0,7.0,...|[5.0,3.0,4.0,8.0,...|[5.0,9.0,2.0,7.0,...|[5.0,3.0,4.0,8.0,...| | 6|[6.0,1.0,1.0,4.0,...|[6.0,3.0,9.0,8.0,...|[6.0,1.0,1.0,4.0,...|[6.0,3.0,9.0,8.0,...| +---+--------------------+--------------------+--------------------+--------------------+
vecDF.select("uid","features").show(false) +---+-------------------------------------------------------------------------------------------------+ |uid|features | +---+-------------------------------------------------------------------------------------------------+ |1 |[1.0,2.0,3.0,8.0,4.0,5.0,3.0,2.0,0.0,4.0,2.0,8.0,1.0,2.0,3.0,8.0,4.0,5.0,3.0,2.0,0.0,4.0,2.0,8.0]| |2 |[4.0,3.0,8.0,7.0,9.0,8.0,3.0,3.0,2.0,6.0,5.0,4.0,4.0,3.0,8.0,7.0,9.0,8.0,3.0,3.0,2.0,6.0,5.0,4.0]| |3 |[6.0,1.0,9.0,2.0,3.0,6.0,8.0,5.0,1.0,2.0,3.0,5.0,6.0,1.0,9.0,2.0,3.0,6.0,8.0,5.0,1.0,2.0,3.0,5.0]| |4 |[7.0,8.0,6.0,9.0,4.0,5.0,9.0,8.0,2.0,4.0,9.0,2.0,7.0,8.0,6.0,9.0,4.0,5.0,9.0,8.0,2.0,4.0,9.0,2.0]| |5 |[9.0,2.0,7.0,8.0,7.0,3.0,3.0,4.0,8.0,0.0,6.0,2.0,9.0,2.0,7.0,8.0,7.0,3.0,3.0,4.0,8.0,0.0,6.0,2.0]| |6 |[1.0,1.0,4.0,2.0,8.0,4.0,3.0,9.0,8.0,8.0,9.0,3.0,1.0,1.0,4.0,2.0,8.0,4.0,3.0,9.0,8.0,8.0,9.0,3.0]| +---+-------------------------------------------------------------------------------------------------+
vecDF.printSchema root |-- uid: integer (nullable = false) |-- first_features: vector (nullable = true) |-- second_features: vector (nullable = true) |-- third_features: vector (nullable = true) |-- fourth_features: vector (nullable = true) |-- features: vector (nullable = true)
Author image

About James Conner

Scuba dive master, wildlife photographer, anthropologist, programmer, electronics tinkerer and big data expert.