I am trying to train a random forest classifier w/ sparkml
<https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier>
and
am seeing that the *accuracy etc. is very bad (about the same as the
dataset's response distribution itself), yet when using the same data in a
random forest from the H2O
<http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/drf.html#> module
I actually do get OK results* (~80% accuracy, 0.90 F2, etc which at least
implies that it's learning *something*). This big difference makes me
suspect that this is not just a hyperparameter tuning issue. What could be
going on here?

My dataset is mostly categorical features (16 categorical, 2 integer) with
a binary response distribution of about 36/64%.

*Both sparkml and H2O implementations coded in a similar way as with the
actual dataset seemed to perform well for my benchmarking dataset
<https://archive.ics.uci.edu/ml/datasets/Car+Evaluation>* (note if you
actually download the car.data file here it has no column names but I added
that manually when I downloaded it) else I would think that something was
just wrong with the way I implemented the sparkml code itself. I find it
odd that the sparkml implementation seems to not even attribute any
importance to any of the features (ie. the value of all the importances in
the trained model's featureImportances are zero). *I would think there
would at least be something in the featureImportances when mimicking the
broad strokes of the H2O hyperparams (imbalanced classes or not)*. So not
only is the sparkml implementation not learning anything vs the H2O
version, it does not even seem to think that any of the provided features
are important *at all* in making a decision about the samples). This big
difference makes me suspect that this is not just a hyperparameter tuning
issue.

An example of how I'm building and training the sparkml pipeline can be
found here: https://gist.github.com/reedv/409df80f516ec17e330510365f75f558 (my
actual hyperparams are shown further down this post)

An example of how I'm training the H2O implementation can the found here:
https://gist.github.com/reedv/169856f9442354a404fc0e1e0d3e8aa8 (the example
is using benchmarking data, but hyperparams for the actual
H2ORandomForestClassifier are the same as with my actual dataset).

My basic sparkml pipline and training looks like...


training_features = <list of all the training features in the
dataset>print("Training features:")print(training_features)
# convert response label to categorical index for spark
label_idxer = StringIndexer(inputCol='outcome',
                            outputCol="label").fit(dff)
# convert all categorical features to indexes for spark
feature_idxer = StringIndexer(inputCols=training_features,
                              outputCols=[x + 'Index' for x in
training_features],
                              handleInvalid="keep").fit(dff)

training_features = [x + 'Index' for x in
training_features]print("Training features:")print(training_features)#
convert all features per record into single vectors to feed to spark
ML transformer
assembler = VectorAssembler(inputCols=training_features,
                            outputCol="features")
# create a RDF estimator
rf = RandomForestClassifier(labelCol="label",
                            featuresCol="features",
                            seed=1819511352808605668)
pprint.pprint(vars(rf))
pp.pprint(rf.getRawPredictionCol())
pp.pprint(rf.getProbabilityCol())
pp.pprint(rf.getPredictionCol())
# convert prediction output category indexes back to strings
label_converter = IndexToString(inputCol=rf.getPredictionCol(),
                                outputCol="prediction_label",
                                labels=label_idxer.labels)

pipeline = Pipeline(stages=[label_idxer, feature_idxer, assembler,
                            rf,
                            label_converter])  # type:
pyspark.ml.pipeline.PipelineModel
# we would normally then do something like...# pipline_transformer =
pipeline.fit(spark_df)# prediction_df =
pipeline_transformer.transform(spark_df)# ...(see
https://spark.apache.org/docs/latest/ml-pipeline.html#pipeline-components)
# ...but instead, we are going to use cross validation to optimize the
RDF w/in the pipeline

rfparamGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [20, 30, 60, 90]) \
    .addGrid(rf.maxBins, [10000, 30000, 100000, 300000]) \
    .addGrid(rf.numTrees, [37, 64, 280, 370]) \
    .addGrid(rf.minInstancesPerNode, [1]) \
    .addGrid(rf.minInfoGain, [0.0, 0.25, 1.0]) \
    .addGrid(rf.subsamplingRate, [0.5, 0.75, 1.0]) \
    .addGrid(rf.bootstrap, [True]) \
    .addGrid(rf.featureSubsetStrategy, ['auto']) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=rfparamGrid,
                          evaluator=MulticlassClassificationEvaluator(
                              labelCol="label",
                              predictionCol=rf.getPredictionCol(),
                              metricName="weightedFMeasure", beta=2),
# since my data is imbalanced, I'm using F2 scoring metric
                          numFolds=3)

display(dff.head(n=3))
(train_u, test_u) = dff.randomSplit([0.8, 0.2])
assert train_u.dtypes == test_u.dtypes
# fit on the cross validation estimstor to get the optimal pipline
transformer/modelprint(datetime.datetime.now())
best_rf_pipeline = crossval.fit(train_u)  # type:
pyspark.ml.pipeline.PipelineModelprint(datetime.datetime.now())
# now let's look at how it performs on the witheld test data as well
as inspecting some aspects of the RDF model w/in the pipeline
test_prediction = best_rf_pipeline.transform(test_u)

evals = MulticlassClassificationEvaluator(labelCol="label",
predictionCol=rf.getPredictionCol())

statistics = {
    "acc": evals.evaluate(test_prediction, {evals.metricName: "accuracy"}),
    "recall": evals.evaluate(test_prediction, {evals.metricName:
"weightedRecall"}),
    "precision": evals.evaluate(test_prediction, {evals.metricName:
"weightedPrecision"}),
    "f1": evals.evaluate(test_prediction, {evals.metricName: "f1"}),
    "f2": evals.evaluate(test_prediction, {evals.metricName:
"weightedFMeasure", evals.beta: 2}),
}print("Model Information")for stat in statistics:
    print(stat + ": " + str(statistics[stat]))
print("Model Feature
Importance:")print(type(best_rf_pipeline))print(type(best_rf_pipeline.bestModel))for
index, stage in enumerate(best_rf_pipeline.bestModel.stages):
    print(f"{index}: {type(stage)}")
best_rf = best_rf_pipeline.bestModel.stages[3]
pp.pprint(type(best_rf.featureImportances))
pp.pprint(best_rf.featureImportances)



These are the configs for the H2O model (truncated based on what seemed
highly non-relevant (again IDK what the issue is so there may still be
things left in that are not relevant))



{
 'auc_type': {'actual': 'AUTO', 'default': 'AUTO', 'input': 'AUTO'},
 'balance_classes': {'actual': True, 'default': False, 'input': True},
 'binomial_double_trees': {'actual': True, 'default': False, 'input': True},
 'build_tree_one_node': {'actual': False, 'default': False, 'input': False},
 'calibrate_model': {'actual': False, 'default': False, 'input': False},
 'calibration_frame': {'actual': None, 'default': None, 'input': None},
 'categorical_encoding': {'actual': 'Enum', 'default': 'AUTO', 'input': 'AUTO'},
 'check_constant_response': {'actual': True, 'default': True, 'input': True},
 'checkpoint': {'actual': None, 'default': None, 'input': None},
 'class_sampling_factors': {'actual': None, 'default': None, 'input': None},
 'col_sample_rate_change_per_level': {'actual': 1.0,
                                      'default': 1.0,
                                      'input': 1.0},
 'col_sample_rate_per_tree': {'actual': 1.0, 'default': 1.0, 'input': 1.0},
 'custom_metric_func': {'actual': None, 'default': None, 'input': None},
 'distribution': {'actual': 'multinomial',
                  'default': 'AUTO',
                  'input': 'multinomial'},
 'export_checkpoints_dir': {'actual': None, 'default': None, 'input': None},
 'fold_assignment': {'actual': None, 'default': 'AUTO', 'input': 'AUTO'},
 'fold_column': {'actual': None, 'default': None, 'input': None},
 'gainslift_bins': {'actual': -1, 'default': -1, 'input': -1},
 'histogram_type': {'actual': 'UniformAdaptive',
                    'default': 'AUTO',
                    'input': 'AUTO'},
 'ignore_const_cols': {'actual': True, 'default': True, 'input': True},
 'keep_cross_validation_fold_assignment': {'actual': False,
                                           'default': False,
                                           'input': False},
 'keep_cross_validation_models': {'actual': True,
                                  'default': True,
                                  'input': True},
 'keep_cross_validation_predictions': {'actual': False,
                                       'default': False,
                                       'input': False},
 'max_after_balance_size': {'actual': 5.0, 'default': 5.0, 'input': 5.0},
 'max_confusion_matrix_size': {'actual': 20, 'default': 20, 'input': 20},
 'max_depth': {'actual': 20, 'default': 20, 'input': 20},
 'max_runtime_secs': {'actual': 10800.0, 'default': 0.0, 'input': 10800.0},
 'min_rows': {'actual': 1.0, 'default': 1.0, 'input': 1.0},
 'min_split_improvement': {'actual': 1e-05, 'default': 1e-05, 'input': 1e-05},
 'mtries': {'actual': -1, 'default': -1, 'input': -1},
 'nbins': {'actual': 32, 'default': 20, 'input': 32},
 'nbins_cats': {'actual': 1024, 'default': 1024, 'input': 1024},
 'nbins_top_level': {'actual': 1024, 'default': 1024, 'input': 1024},
 'nfolds': {'actual': 0, 'default': 0, 'input': 0},
 'ntrees': {'actual': 64, 'default': 50, 'input': 64},
 'offset_column': {'actual': None, 'default': None, 'input': None},
 'r2_stopping': {'actual': 1.7976931348623157e+308,
                 'default': 1.7976931348623157e+308,
                 'input': 1.7976931348623157e+308},
 'response_column': {'actual': {'__meta': {'schema_name': 'ColSpecifierV3',
                                           'schema_type': 'VecSpecifier',
                                           'schema_version': 3},
                                'column_name': 'outcome',
                                'is_member_of_frames': None},
                     'default': None,
                     'input': {'__meta': {'schema_name': 'ColSpecifierV3',
                                          'schema_type': 'VecSpecifier',
                                          'schema_version': 3},
                               'column_name': 'outcome',
                               'is_member_of_frames': None}},
 'sample_rate': {'actual': 0.632, 'default': 0.632, 'input': 0.632},
 'sample_rate_per_class': {'actual': None, 'default': None, 'input': None},
 'score_each_iteration': {'actual': False, 'default': False, 'input': False},
 'score_tree_interval': {'actual': 0, 'default': 0, 'input': 0},
 'seed': {'actual': 1819511352808605668, 'default': -1, 'input': -1},
 'stopping_metric': {'actual': None, 'default': 'AUTO', 'input': 'AUTO'},
 'stopping_rounds': {'actual': 0, 'default': 0, 'input': 0},
 'stopping_tolerance': {'actual': 0.001, 'default': 0.001, 'input': 0.001},
 'weights_column': {'actual': None, 'default': None, 'input': None}
}



These are the configs for the sparkml model (which I use in a 3-fold cross
validation
<https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation>).
Note that I was not sure how to duplicate certain H2O hyperparams in spark
(eg. H2O's random forest supports "binomial_double_trees
<https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/binomial_double_trees.html>"
while sparkml doesn't and sparkml's RF required maxBins >= number of
features while H2O's did not).



rfparamGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [20, 30, 60, 90]) \
    .addGrid(rf.maxBins, [10000, 30000, 100000, 300000]) \
    .addGrid(rf.numTrees, [37, 64, 280, 370]) \
    .addGrid(rf.minInstancesPerNode, [1]) \
    .addGrid(rf.minInfoGain, [0.0, 0.25, 1.0]) \
    .addGrid(rf.subsamplingRate, [0.5, 0.75, 1.0]) \
    .addGrid(rf.bootstrap, [True]) \
    .addGrid(rf.featureSubsetStrategy, ['auto']) \
    .build()

(Even if just using the closest maxDepth, maxBins, numTrees values as in
the H2O version, results still the same for the sparkml model; nothing
learned beyond just the distribution of the responses themselves and all
featureImportances of the sparkml model still all zeros).

Anyone with more experience have any ideas what could be going on here? See
any implementation / usage mistakes I'm making that could be causing the
sparkml pipeline to train so poorly?

Reply via email to