Step 4
Create the Spark Data Pipeline
Now we create the pipeline using PySpark. This essentially takes your data and, per the feature lists you pass, will do the transformations and vectorizing so it is ready for modeling. I referenced the “Extracting, transforming and selecting features” Apache Spark documentation a lot for this pipeline and project.
Below is a helper function to select from the numeric features which ones to standardize based on the kurtosis or skew of that feature. The current defaults for upper_skew
and lower_skew
are just general guidelines (depending where you read), but you can modify the upper and lower skew as desired.
Create Pipeline
Now we’ll get into the actual data pipeline. The feature list selection part can be further enhanced to be more dynamic vs listing out each feature, but for this small dataset I just left it as is with cat_features
, num_features
, and label
. Selecting features by type can be done similar to how I did it in the select_features_to_scale
helper function using something like this list(spark_df.toPandas().select_dtypes(include=['object']).columns)
which would return a list of all the columns in your spark dataframe that are object or string type.
The first thing we want to do is create an empty list called stages
. This will contain each step that the data pipeline needs to to complete all transformations within our pipeline. I print out each step of the stages after the pipeline so you can see the sequential steps from my code to a list.
The second part is going to be a basic loop to go through each categorical feature from our list cat_features
and then index and encode those features using one-hot encoding. StringIndexer
encodes your categorical feature to a feature index with the highest frequency label (count) as feature index 0
and so on. I will preview the transformed data frame after the pipeline, Step 5, where you can see each feature index created from the categorical features. For more information and a basic example of StringIndexer check the here.
Within the loop we also do some one-hot encoding (OHE) using the OneHotEncoderEstimator
. This function only takes a label index so if you have categorical data (objects or strings) you have to use StringIndexer
so you can pass a label index to the OHE estimator. One nice thing I found from looking at dozens of examples was that you can chain StringIndexer
output right into the OHE estimator using string_indexer.getOutputCol()
. If you have a lot of features to transform you’ll want to do some thinking about the names, OutputCol
, because you can’t just overwrite feature names so get creative. We append all those pipeline steps within our loop into our pipeline list stages
.
Next we use StringIndexer
again on our label feature or dependent variable. And then we’ll move on to scaling the numeric variables using the select_features_to_scale
helper function from above. Once that list is selected, we’ll vectorize those features using VectorAssembler
and then standardize the features within that vector using StandardScaler
. Then we append those steps to our ongoing pipeline list stages
.
We can see all the steps within our pipeline by looking at our stages
list that we’ve been sequentially adding.