Repository: spark Updated Branches: refs/heads/master 244bcff19 -> 3cb1b5780
[SPARK-24852][ML] Update spark.ml to use Instrumentation.instrumented. ## What changes were proposed in this pull request? Followup for #21719. Update spark.ml training code to fully wrap instrumented methods and remove old instrumentation APIs. ## How was this patch tested? existing tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Bago Amirbekian <b...@databricks.com> Closes #21799 from MrBago/new-instrumentation-apis2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cb1b578 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cb1b578 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cb1b578 Branch: refs/heads/master Commit: 3cb1b57809d0b4a93223669f5c10cea8fc53eff6 Parents: 244bcff Author: Bago Amirbekian <b...@databricks.com> Authored: Fri Jul 20 12:13:15 2018 -0700 Committer: Joseph K. Bradley <jos...@databricks.com> Committed: Fri Jul 20 12:13:15 2018 -0700 ---------------------------------------------------------------------- .../classification/DecisionTreeClassifier.scala | 24 +++++----- .../spark/ml/classification/GBTClassifier.scala | 14 +++--- .../spark/ml/classification/LinearSVC.scala | 12 ++--- .../ml/classification/LogisticRegression.scala | 2 +- .../MultilayerPerceptronClassifier.scala | 14 +++--- .../spark/ml/classification/NaiveBayes.scala | 12 ++--- .../spark/ml/classification/OneVsRest.scala | 9 ++-- .../classification/RandomForestClassifier.scala | 13 +++--- .../spark/ml/clustering/BisectingKMeans.scala | 12 ++--- .../spark/ml/clustering/GaussianMixture.scala | 12 ++--- .../org/apache/spark/ml/clustering/KMeans.scala | 9 ++-- .../org/apache/spark/ml/clustering/LDA.scala | 12 ++--- .../org/apache/spark/ml/fpm/FPGrowth.scala | 12 ++--- .../apache/spark/ml/recommendation/ALS.scala | 9 ++-- .../ml/regression/AFTSurvivalRegression.scala | 13 +++--- .../ml/regression/DecisionTreeRegressor.scala | 24 +++++----- .../spark/ml/regression/GBTRegressor.scala | 12 ++--- .../GeneralizedLinearRegression.scala | 12 ++--- .../ml/regression/IsotonicRegression.scala | 12 ++--- .../spark/ml/regression/LinearRegression.scala | 21 ++++----- .../ml/regression/RandomForestRegressor.scala | 14 +++--- .../apache/spark/ml/tuning/CrossValidator.scala | 9 ++-- .../spark/ml/tuning/TrainValidationSplit.scala | 9 ++-- .../apache/spark/ml/util/Instrumentation.scala | 47 ++------------------ 24 files changed, 153 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index c9786f1..8a57bfc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -29,6 +29,7 @@ import org.apache.spark.ml.tree._ import org.apache.spark.ml.tree.DecisionTreeModelReadWrite._ import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel} import org.apache.spark.rdd.RDD @@ -96,8 +97,10 @@ class DecisionTreeClassifier @Since("1.4.0") ( @Since("1.6.0") override def setSeed(value: Long): this.type = set(seed, value) - override protected def train(dataset: Dataset[_]): DecisionTreeClassificationModel = { - val instr = Instrumentation.create(this, dataset) + override protected def train( + dataset: Dataset[_]): DecisionTreeClassificationModel = instrumented { instr => + instr.logPipelineStage(this) + instr.logDataset(dataset) val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) val numClasses: Int = getNumClasses(dataset) @@ -112,30 +115,27 @@ class DecisionTreeClassifier @Since("1.4.0") ( val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset, numClasses) val strategy = getOldStrategy(categoricalFeatures, numClasses) - instr.logParams(maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB, + instr.logParams(this, maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB, cacheNodeIds, checkpointInterval, impurity, seed) val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all", seed = $(seed), instr = Some(instr), parentUID = Some(uid)) - val m = trees.head.asInstanceOf[DecisionTreeClassificationModel] - instr.logSuccess(m) - m + trees.head.asInstanceOf[DecisionTreeClassificationModel] } /** (private[ml]) Train a decision tree on an RDD */ private[ml] def train(data: RDD[LabeledPoint], - oldStrategy: OldStrategy): DecisionTreeClassificationModel = { - val instr = Instrumentation.create(this, data) - instr.logParams(maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB, + oldStrategy: OldStrategy): DecisionTreeClassificationModel = instrumented { instr => + instr.logPipelineStage(this) + instr.logDataset(data) + instr.logParams(this, maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB, cacheNodeIds, checkpointInterval, impurity, seed) val trees = RandomForest.run(data, oldStrategy, numTrees = 1, featureSubsetStrategy = "all", seed = 0L, instr = Some(instr), parentUID = Some(uid)) - val m = trees.head.asInstanceOf[DecisionTreeClassificationModel] - instr.logSuccess(m) - m + trees.head.asInstanceOf[DecisionTreeClassificationModel] } /** (private[ml]) Create a Strategy instance to use with the old API. */ http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala index 337133a..33acd99 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala @@ -31,9 +31,9 @@ import org.apache.spark.ml.tree._ import org.apache.spark.ml.tree.impl.GradientBoostedTrees import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel} -import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ @@ -152,7 +152,8 @@ class GBTClassifier @Since("1.4.0") ( set(validationIndicatorCol, value) } - override protected def train(dataset: Dataset[_]): GBTClassificationModel = { + override protected def train( + dataset: Dataset[_]): GBTClassificationModel = instrumented { instr => val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) @@ -189,8 +190,9 @@ class GBTClassifier @Since("1.4.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } - val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType, + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, labelCol, featuresCol, predictionCol, impurity, lossType, maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode, seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval, featureSubsetStrategy, validationIndicatorCol) @@ -204,9 +206,7 @@ class GBTClassifier @Since("1.4.0") ( GradientBoostedTrees.run(trainDataset, boostingStrategy, $(seed), $(featureSubsetStrategy)) } - val m = new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures) - instr.logSuccess(m) - m + new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures) } @Since("1.4.1") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 38eb045..20f9366 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -33,6 +33,7 @@ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.rdd.RDD @@ -162,7 +163,7 @@ class LinearSVC @Since("2.2.0") ( @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) - override protected def train(dataset: Dataset[_]): LinearSVCModel = { + override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr => val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { @@ -170,8 +171,9 @@ class LinearSVC @Since("2.2.0") ( Instance(label, weight, features) } - val instr = Instrumentation.create(this, dataset) - instr.logParams(regParam, maxIter, fitIntercept, tol, standardization, threshold, + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth) val (summarizer, labelSummarizer) = { @@ -276,9 +278,7 @@ class LinearSVC @Since("2.2.0") ( (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result()) } - val model = copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector)) - instr.logSuccess(model) - model + copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 25fb9c8..af651b0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -503,7 +503,7 @@ class LogisticRegression @Since("1.2.0") ( instr.logPipelineStage(this) instr.logDataset(dataset) - instr.logParams(regParam, elasticNetParam, standardization, threshold, + instr.logParams(this, regParam, elasticNetParam, standardization, threshold, maxIter, tol, fitIntercept) val (summarizer, labelSummarizer) = { http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index 57ba47e..65e3b2d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -28,6 +28,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.sql.Dataset /** Params for Multilayer Perceptron. */ @@ -230,9 +231,11 @@ class MultilayerPerceptronClassifier @Since("1.5.0") ( * @param dataset Training dataset * @return Fitted model */ - override protected def train(dataset: Dataset[_]): MultilayerPerceptronClassificationModel = { - val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, predictionCol, layers, maxIter, tol, + override protected def train( + dataset: Dataset[_]): MultilayerPerceptronClassificationModel = instrumented { instr => + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, labelCol, featuresCol, predictionCol, layers, maxIter, tol, blockSize, solver, stepSize, seed) val myLayers = $(layers) @@ -264,10 +267,7 @@ class MultilayerPerceptronClassifier @Since("1.5.0") ( } trainer.setStackSize($(blockSize)) val mlpModel = trainer.train(data) - val model = new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights) - - instr.logSuccess(model) - model + new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights) } } http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index 1dde18d..f65d397 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -25,6 +25,7 @@ import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.util.MLUtils import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.functions.{col, lit} @@ -125,8 +126,9 @@ class NaiveBayes @Since("1.5.0") ( */ private[spark] def trainWithLabelCheck( dataset: Dataset[_], - positiveLabel: Boolean): NaiveBayesModel = { - val instr = Instrumentation.create(this, dataset) + positiveLabel: Boolean): NaiveBayesModel = instrumented { instr => + instr.logPipelineStage(this) + instr.logDataset(dataset) if (positiveLabel && isDefined(thresholds)) { val numClasses = getNumClasses(dataset) instr.logNumClasses(numClasses) @@ -148,7 +150,7 @@ class NaiveBayes @Since("1.5.0") ( } } - instr.logParams(labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol, + instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol, probabilityCol, modelType, smoothing, thresholds) val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size @@ -204,9 +206,7 @@ class NaiveBayes @Since("1.5.0") ( val pi = Vectors.dense(piArray) val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true) - val model = new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray) - instr.logSuccess(model) - model + new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray) } @Since("1.5.0") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 3474b61..1835a91 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -36,6 +36,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol} import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -362,11 +363,12 @@ final class OneVsRest @Since("1.4.0") ( } @Since("2.0.0") - override def fit(dataset: Dataset[_]): OneVsRestModel = { + override def fit(dataset: Dataset[_]): OneVsRestModel = instrumented { instr => transformSchema(dataset.schema) - val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, predictionCol, parallelism, rawPredictionCol) + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, labelCol, featuresCol, predictionCol, parallelism, rawPredictionCol) instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName) // determine number of classes either from metadata if provided, or via computation. @@ -440,7 +442,6 @@ final class OneVsRest @Since("1.4.0") ( case attr: Attribute => attr } val model = new OneVsRestModel(uid, labelAttribute.toMetadata(), models).setParent(this) - instr.logSuccess(model) copyValues(model) } http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala index 040db3b..94887ac 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala @@ -28,6 +28,7 @@ import org.apache.spark.ml.tree._ import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel} import org.apache.spark.rdd.RDD @@ -115,8 +116,10 @@ class RandomForestClassifier @Since("1.4.0") ( override def setFeatureSubsetStrategy(value: String): this.type = set(featureSubsetStrategy, value) - override protected def train(dataset: Dataset[_]): RandomForestClassificationModel = { - val instr = Instrumentation.create(this, dataset) + override protected def train( + dataset: Dataset[_]): RandomForestClassificationModel = instrumented { instr => + instr.logPipelineStage(this) + instr.logDataset(dataset) val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) val numClasses: Int = getNumClasses(dataset) @@ -131,7 +134,7 @@ class RandomForestClassifier @Since("1.4.0") ( val strategy = super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity) - instr.logParams(labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol, + instr.logParams(this, labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol, impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain, minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, checkpointInterval) @@ -140,11 +143,9 @@ class RandomForestClassifier @Since("1.4.0") ( .map(_.asInstanceOf[DecisionTreeClassificationModel]) val numFeatures = oldDataset.first().features.size - val m = new RandomForestClassificationModel(uid, trees, numFeatures, numClasses) instr.logNumClasses(numClasses) instr.logNumFeatures(numFeatures) - instr.logSuccess(m) - m + new RandomForestClassificationModel(uid, trees, numFeatures, numClasses) } @Since("1.4.1") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index de56447..48b8c52 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -26,6 +26,7 @@ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans, BisectingKMeansModel => MLlibBisectingKMeansModel} import org.apache.spark.mllib.linalg.VectorImplicits._ @@ -257,12 +258,13 @@ class BisectingKMeans @Since("2.0.0") ( def setDistanceMeasure(value: String): this.type = set(distanceMeasure, value) @Since("2.0.0") - override def fit(dataset: Dataset[_]): BisectingKMeansModel = { + override def fit(dataset: Dataset[_]): BisectingKMeansModel = instrumented { instr => transformSchema(dataset.schema, logging = true) val rdd = DatasetUtils.columnToOldVector(dataset, getFeaturesCol) - val instr = Instrumentation.create(this, dataset) - instr.logParams(featuresCol, predictionCol, k, maxIter, seed, + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, featuresCol, predictionCol, k, maxIter, seed, minDivisibleClusterSize, distanceMeasure) val bkm = new MLlibBisectingKMeans() @@ -275,10 +277,8 @@ class BisectingKMeans @Since("2.0.0") ( val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this)) val summary = new BisectingKMeansSummary( model.transform(dataset), $(predictionCol), $(featuresCol), $(k), $(maxIter)) - model.setSummary(Some(summary)) instr.logNamedValue("clusterSizes", summary.clusterSizes) - instr.logSuccess(model) - model + model.setSummary(Some(summary)) } @Since("2.0.0") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index f0707b3..310b03b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -29,6 +29,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.stat.distribution.MultivariateGaussian import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatrix, Vector => OldVector, Vectors => OldVectors} import org.apache.spark.rdd.RDD @@ -335,7 +336,7 @@ class GaussianMixture @Since("2.0.0") ( private val numSamples = 5 @Since("2.0.0") - override def fit(dataset: Dataset[_]): GaussianMixtureModel = { + override def fit(dataset: Dataset[_]): GaussianMixtureModel = instrumented { instr => transformSchema(dataset.schema, logging = true) val sc = dataset.sparkSession.sparkContext @@ -352,8 +353,9 @@ class GaussianMixture @Since("2.0.0") ( s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" + s" matrix is quadratic in the number of features.") - val instr = Instrumentation.create(this, dataset) - instr.logParams(featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol) + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol) instr.logNumFeatures(numFeatures) val shouldDistributeGaussians = GaussianMixture.shouldDistributeGaussians( @@ -425,11 +427,9 @@ class GaussianMixture @Since("2.0.0") ( val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this) val summary = new GaussianMixtureSummary(model.transform(dataset), $(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood, iter) - model.setSummary(Some(summary)) instr.logNamedValue("logLikelihood", logLikelihood) instr.logNamedValue("clusterSizes", summary.clusterSizes) - instr.logSuccess(model) - model + model.setSummary(Some(summary)) } @Since("2.0.0") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 6f4a30d..498310d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -28,6 +28,7 @@ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.VectorImplicits._ @@ -336,7 +337,7 @@ class KMeans @Since("1.5.0") ( def setSeed(value: Long): this.type = set(seed, value) @Since("2.0.0") - override def fit(dataset: Dataset[_]): KMeansModel = { + override def fit(dataset: Dataset[_]): KMeansModel = instrumented { instr => transformSchema(dataset.schema, logging = true) val handlePersistence = dataset.storageLevel == StorageLevel.NONE @@ -346,8 +347,9 @@ class KMeans @Since("1.5.0") ( instances.persist(StorageLevel.MEMORY_AND_DISK) } - val instr = Instrumentation.create(this, dataset) - instr.logParams(featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure, + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure, maxIter, seed, tol) val algo = new MLlibKMeans() .setK($(k)) @@ -369,7 +371,6 @@ class KMeans @Since("1.5.0") ( model.setSummary(Some(summary)) instr.logNamedValue("clusterSizes", summary.clusterSizes) - instr.logSuccess(model) if (handlePersistence) { instances.unpersist() } http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index fed42c9..50867f7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -32,6 +32,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed} import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, @@ -896,11 +897,12 @@ class LDA @Since("1.6.0") ( override def copy(extra: ParamMap): LDA = defaultCopy(extra) @Since("2.0.0") - override def fit(dataset: Dataset[_]): LDAModel = { + override def fit(dataset: Dataset[_]): LDAModel = instrumented { instr => transformSchema(dataset.schema, logging = true) - val instr = Instrumentation.create(this, dataset) - instr.logParams(featuresCol, topicDistributionCol, k, maxIter, subsamplingRate, + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, featuresCol, topicDistributionCol, k, maxIter, subsamplingRate, checkpointInterval, keepLastCheckpoint, optimizeDocConcentration, topicConcentration, learningDecay, optimizer, learningOffset, seed) @@ -923,9 +925,7 @@ class LDA @Since("1.6.0") ( } instr.logNumFeatures(newModel.vocabSize) - val model = copyValues(newModel).setParent(this) - instr.logSuccess(model) - model + copyValues(newModel).setParent(this) } @Since("1.6.0") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 9d664b6..85c483c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -26,6 +26,7 @@ import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.HasPredictionCol import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, FPGrowth => MLlibFPGrowth} import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset @@ -158,11 +159,12 @@ class FPGrowth @Since("2.2.0") ( genericFit(dataset) } - private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = { + private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = instrumented { instr => val handlePersistence = dataset.storageLevel == StorageLevel.NONE - val instr = Instrumentation.create(this, dataset) - instr.logParams(params: _*) + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, params: _*) val data = dataset.select($(itemsCol)) val items = data.where(col($(itemsCol)).isNotNull).rdd.map(r => r.getSeq[Any](0).toArray) val mllibFP = new MLlibFPGrowth().setMinSupport($(minSupport)) @@ -185,9 +187,7 @@ class FPGrowth @Since("2.2.0") ( items.unpersist() } - val model = copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this) - instr.logSuccess(model) - model + copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this) } @Since("2.2.0") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index a23f955..ffe5927 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -39,6 +39,7 @@ import org.apache.spark.ml.linalg.BLAS import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.linalg.CholeskyDecomposition import org.apache.spark.mllib.optimization.NNLS import org.apache.spark.rdd.RDD @@ -654,7 +655,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] } @Since("2.0.0") - override def fit(dataset: Dataset[_]): ALSModel = { + override def fit(dataset: Dataset[_]): ALSModel = instrumented { instr => transformSchema(dataset.schema) import dataset.sparkSession.implicits._ @@ -666,8 +667,9 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] Rating(row.getInt(0), row.getInt(1), row.getFloat(2)) } - val instr = Instrumentation.create(this, ratings) - instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol, + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol, itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval, seed, intermediateStorageLevel, finalStorageLevel) @@ -681,7 +683,6 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] val userDF = userFactors.toDF("id", "features") val itemDF = itemFactors.toDF("id", "features") val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this) - instr.logSuccess(model) copyValues(model) } http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index e27a96e..3cd0706 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -32,6 +32,7 @@ import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.mllib.util.MLUtils @@ -210,7 +211,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S } @Since("2.0.0") - override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel = { + override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel = instrumented { instr => transformSchema(dataset.schema, logging = true) val instances = extractAFTPoints(dataset) val handlePersistence = dataset.storageLevel == StorageLevel.NONE @@ -229,8 +230,9 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val numFeatures = featuresStd.size - val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, censorCol, predictionCol, quantilesCol, + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, labelCol, featuresCol, censorCol, predictionCol, quantilesCol, fitIntercept, maxIter, tol, aggregationDepth) instr.logNamedValue("quantileProbabilities.size", $(quantileProbabilities).length) instr.logNumFeatures(numFeatures) @@ -284,10 +286,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S val coefficients = Vectors.dense(rawCoefficients) val intercept = parameters(1) val scale = math.exp(parameters(0)) - val model = copyValues(new AFTSurvivalRegressionModel(uid, coefficients, - intercept, scale).setParent(this)) - instr.logSuccess(model) - model + copyValues(new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale).setParent(this)) } @Since("1.6.0") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala index 8bcf079..018290f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala @@ -30,6 +30,7 @@ import org.apache.spark.ml.tree._ import org.apache.spark.ml.tree.DecisionTreeModelReadWrite._ import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel} import org.apache.spark.rdd.RDD @@ -99,37 +100,36 @@ class DecisionTreeRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S @Since("2.0.0") def setVarianceCol(value: String): this.type = set(varianceCol, value) - override protected def train(dataset: Dataset[_]): DecisionTreeRegressionModel = { + override protected def train( + dataset: Dataset[_]): DecisionTreeRegressionModel = instrumented { instr => val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) val strategy = getOldStrategy(categoricalFeatures) - val instr = Instrumentation.create(this, oldDataset) - instr.logParams(params: _*) + instr.logPipelineStage(this) + instr.logDataset(oldDataset) + instr.logParams(this, params: _*) val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all", seed = $(seed), instr = Some(instr), parentUID = Some(uid)) - val m = trees.head.asInstanceOf[DecisionTreeRegressionModel] - instr.logSuccess(m) - m + trees.head.asInstanceOf[DecisionTreeRegressionModel] } /** (private[ml]) Train a decision tree on an RDD */ private[ml] def train( data: RDD[LabeledPoint], oldStrategy: OldStrategy, - featureSubsetStrategy: String): DecisionTreeRegressionModel = { - val instr = Instrumentation.create(this, data) - instr.logParams(params: _*) + featureSubsetStrategy: String): DecisionTreeRegressionModel = instrumented { instr => + instr.logPipelineStage(this) + instr.logDataset(data) + instr.logParams(this, params: _*) val trees = RandomForest.run(data, oldStrategy, numTrees = 1, featureSubsetStrategy, seed = $(seed), instr = Some(instr), parentUID = Some(uid)) - val m = trees.head.asInstanceOf[DecisionTreeRegressionModel] - instr.logSuccess(m) - m + trees.head.asInstanceOf[DecisionTreeRegressionModel] } /** (private[ml]) Create a Strategy instance to use with the old API. */ http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala index eb8b3c0..3305881 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala @@ -31,6 +31,7 @@ import org.apache.spark.ml.tree._ import org.apache.spark.ml.tree.impl.GradientBoostedTrees import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel} import org.apache.spark.rdd.RDD @@ -151,7 +152,7 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) set(validationIndicatorCol, value) } - override protected def train(dataset: Dataset[_]): GBTRegressionModel = { + override protected def train(dataset: Dataset[_]): GBTRegressionModel = instrumented { instr => val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) @@ -168,8 +169,9 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) val numFeatures = trainDataset.first().features.size val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression) - val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType, + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, labelCol, featuresCol, predictionCol, impurity, lossType, maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode, seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval, featureSubsetStrategy) instr.logNumFeatures(numFeatures) @@ -181,9 +183,7 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) GradientBoostedTrees.run(trainDataset, boostingStrategy, $(seed), $(featureSubsetStrategy)) } - val m = new GBTRegressionModel(uid, baseLearners, learnerWeights, numFeatures) - instr.logSuccess(m) - m + new GBTRegressionModel(uid, baseLearners, learnerWeights, numFeatures) } @Since("1.4.0") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 143c8a3..20878b6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -34,6 +34,7 @@ import org.apache.spark.ml.optim._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ @@ -373,13 +374,15 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val @Since("2.0.0") def setLinkPredictionCol(value: String): this.type = set(linkPredictionCol, value) - override protected def train(dataset: Dataset[_]): GeneralizedLinearRegressionModel = { + override protected def train( + dataset: Dataset[_]): GeneralizedLinearRegressionModel = instrumented { instr => val familyAndLink = FamilyAndLink(this) val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size - val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, weightCol, offsetCol, predictionCol, linkPredictionCol, - family, solver, fitIntercept, link, maxIter, regParam, tol) + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, labelCol, featuresCol, weightCol, offsetCol, predictionCol, + linkPredictionCol, family, solver, fitIntercept, link, maxIter, regParam, tol) instr.logNumFeatures(numFeatures) if (numFeatures > WeightedLeastSquares.MAX_NUM_FEATURES) { @@ -431,7 +434,6 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val model.setSummary(Some(trainingSummary)) } - instr.logSuccess(model) model } http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index b046897..8b9233d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -27,6 +27,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.regression.IsotonicRegressionModel.IsotonicRegressionModelWriter import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression} import org.apache.spark.mllib.regression.{IsotonicRegressionModel => MLlibIsotonicRegressionModel} import org.apache.spark.rdd.RDD @@ -161,15 +162,16 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra) @Since("2.0.0") - override def fit(dataset: Dataset[_]): IsotonicRegressionModel = { + override def fit(dataset: Dataset[_]): IsotonicRegressionModel = instrumented { instr => transformSchema(dataset.schema, logging = true) // Extract columns from data. If dataset is persisted, do not persist oldDataset. val instances = extractWeightedLabeledPoints(dataset) val handlePersistence = dataset.storageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic) + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic) instr.logNumFeatures(1) val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic)) @@ -177,9 +179,7 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri if (handlePersistence) instances.unpersist() - val model = copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this)) - instr.logSuccess(model) - model + copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this)) } @Since("1.5.0") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index c45ade9..ce6c12c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -37,6 +37,7 @@ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.evaluation.RegressionMetrics import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.mllib.regression.{LinearRegressionModel => OldLinearRegressionModel} @@ -315,7 +316,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String def setEpsilon(value: Double): this.type = set(epsilon, value) setDefault(epsilon -> 1.35) - override protected def train(dataset: Dataset[_]): LinearRegressionModel = { + override protected def train(dataset: Dataset[_]): LinearRegressionModel = instrumented { instr => // Extract the number of features before deciding optimization solver. val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) @@ -326,9 +327,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String Instance(label, weight, features) } - val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, weightCol, predictionCol, solver, tol, elasticNetParam, - fitIntercept, maxIter, regParam, standardization, aggregationDepth, loss, epsilon) + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, solver, tol, + elasticNetParam, fitIntercept, maxIter, regParam, standardization, aggregationDepth, loss, + epsilon) instr.logNumFeatures(numFeatures) if ($(loss) == SquaredError && (($(solver) == Auto && @@ -353,9 +356,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String model.diagInvAtWA.toArray, model.objectiveHistory) - lrModel.setSummary(Some(trainingSummary)) - instr.logSuccess(lrModel) - return lrModel + return lrModel.setSummary(Some(trainingSummary)) } val handlePersistence = dataset.storageLevel == StorageLevel.NONE @@ -415,9 +416,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String Array(0D), Array(0D)) - model.setSummary(Some(trainingSummary)) - instr.logSuccess(model) - return model + return model.setSummary(Some(trainingSummary)) } else { require($(regParam) == 0.0, "The standard deviation of the label is zero. " + "Model cannot be regularized.") @@ -596,8 +595,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String objectiveHistory) model.setSummary(Some(trainingSummary)) - instr.logSuccess(model) - model } @Since("1.4.0") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index 4509f85..3587572 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -29,6 +29,7 @@ import org.apache.spark.ml.tree._ import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel} import org.apache.spark.rdd.RDD @@ -114,15 +115,17 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S override def setFeatureSubsetStrategy(value: String): this.type = set(featureSubsetStrategy, value) - override protected def train(dataset: Dataset[_]): RandomForestRegressionModel = { + override protected def train( + dataset: Dataset[_]): RandomForestRegressionModel = instrumented { instr => val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) val strategy = super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity) - val instr = Instrumentation.create(this, oldDataset) - instr.logParams(labelCol, featuresCol, predictionCol, impurity, numTrees, + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, labelCol, featuresCol, predictionCol, impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain, minInstancesPerNode, seed, subsamplingRate, cacheNodeIds, checkpointInterval) @@ -131,9 +134,8 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S .map(_.asInstanceOf[DecisionTreeRegressionModel]) val numFeatures = oldDataset.first().features.size - val m = new RandomForestRegressionModel(uid, trees, numFeatures) - instr.logSuccess(m) - m + instr.logNamedValue(Instrumentation.loggerTags.numFeatures, numFeatures) + new RandomForestRegressionModel(uid, trees, numFeatures) } @Since("1.4.0") http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index f327f37..e60a14f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -33,6 +33,7 @@ import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasCollectSubModels, HasParallelism} import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.util.MLUtils import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.types.StructType @@ -118,7 +119,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) def setCollectSubModels(value: Boolean): this.type = set(collectSubModels, value) @Since("2.0.0") - override def fit(dataset: Dataset[_]): CrossValidatorModel = { + override def fit(dataset: Dataset[_]): CrossValidatorModel = instrumented { instr => val schema = dataset.schema transformSchema(schema, logging = true) val sparkSession = dataset.sparkSession @@ -129,8 +130,9 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) // Create execution context based on $(parallelism) val executionContext = getExecutionContext - val instr = Instrumentation.create(this, dataset) - instr.logParams(numFolds, seed, parallelism) + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, numFolds, seed, parallelism) logTuningParams(instr) val collectSubModelsParam = $(collectSubModels) @@ -176,7 +178,6 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) instr.logInfo(s"Best set of parameters:\n${epm(bestIndex)}") instr.logInfo(s"Best cross-validation metric: $bestMetric.") val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]] - instr.logSuccess(bestModel) copyValues(new CrossValidatorModel(uid, bestModel, metrics) .setSubModels(subModels).setParent(this)) } http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index 14d6a69..8b25119 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -34,6 +34,7 @@ import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasCollectSubModels, HasParallelism} import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.types.StructType import org.apache.spark.util.ThreadUtils @@ -117,7 +118,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St def setCollectSubModels(value: Boolean): this.type = set(collectSubModels, value) @Since("2.0.0") - override def fit(dataset: Dataset[_]): TrainValidationSplitModel = { + override def fit(dataset: Dataset[_]): TrainValidationSplitModel = instrumented { instr => val schema = dataset.schema transformSchema(schema, logging = true) val est = $(estimator) @@ -127,8 +128,9 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St // Create execution context based on $(parallelism) val executionContext = getExecutionContext - val instr = Instrumentation.create(this, dataset) - instr.logParams(trainRatio, seed, parallelism) + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, trainRatio, seed, parallelism) logTuningParams(instr) val Array(trainingDataset, validationDataset) = @@ -172,7 +174,6 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St instr.logInfo(s"Best set of parameters:\n${epm(bestIndex)}") instr.logInfo(s"Best train validation split metric: $bestMetric.") val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]] - instr.logSuccess(bestModel) copyValues(new TrainValidationSplitModel(uid, bestModel, metrics) .setSubModels(subModels).setParent(this)) } http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala index 2e43a9e..4965491 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala @@ -27,7 +27,7 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.internal.Logging -import org.apache.spark.ml.{Estimator, Model, PipelineStage} +import org.apache.spark.ml.PipelineStage import org.apache.spark.ml.param.{Param, Params} import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset @@ -37,26 +37,16 @@ import org.apache.spark.util.Utils * A small wrapper that defines a training session for an estimator, and some methods to log * useful information during this session. */ -private[spark] class Instrumentation extends Logging { +private[spark] class Instrumentation private () extends Logging { private val id = UUID.randomUUID() private val shortId = id.toString.take(8) - private val prefix = s"[$shortId] " - - // TODO: remove stage - var stage: Params = _ - // TODO: update spark.ml to use new Instrumentation APIs and remove this constructor - private def this(estimator: Estimator[_], dataset: RDD[_]) = { - this() - logPipelineStage(estimator) - logDataset(dataset) - } + private[util] val prefix = s"[$shortId] " /** * Log some info about the pipeline stage being fit. */ def logPipelineStage(stage: PipelineStage): Unit = { - this.stage = stage // estimator.getClass.getSimpleName can cause Malformed class name error, // call safer `Utils.getSimpleName` instead val className = Utils.getSimpleName(stage.getClass) @@ -119,13 +109,6 @@ private[spark] class Instrumentation extends Logging { logInfo(compact(render(map2jvalue(pairs.toMap)))) } - // TODO: remove this - def logParams(params: Param[_]*): Unit = { - require(stage != null, "`logStageParams` must be called before `logParams` (or an instance of" + - " Params must be provided explicitly).") - logParams(stage, params: _*) - } - def logNumFeatures(num: Long): Unit = { logNamedValue(Instrumentation.loggerTags.numFeatures, num) } @@ -166,14 +149,9 @@ private[spark] class Instrumentation extends Logging { } - // TODO: Remove this (possibly replace with logModel?) /** * Logs the successful completion of the training session. */ - def logSuccess(model: Model[_]): Unit = { - logInfo(s"training finished") - } - def logSuccess(): Unit = { logInfo("training finished") } @@ -200,22 +178,6 @@ private[spark] object Instrumentation { val varianceOfLabels = "varianceOfLabels" } - // TODO: Remove these - /** - * Creates an instrumentation object for a training session. - */ - def create(estimator: Estimator[_], dataset: Dataset[_]): Instrumentation = { - create(estimator, dataset.rdd) - } - - /** - * Creates an instrumentation object for a training session. - */ - def create(estimator: Estimator[_], dataset: RDD[_]): Instrumentation = { - new Instrumentation(estimator, dataset) - } - // end remove - def instrumented[T](body: (Instrumentation => T)): T = { val instr = new Instrumentation() Try(body(instr)) match { @@ -268,8 +230,7 @@ private[spark] object OptionalInstrumentation { * Creates an `OptionalInstrumentation` object from an existing `Instrumentation` object. */ def create(instr: Instrumentation): OptionalInstrumentation = { - new OptionalInstrumentation(Some(instr), - instr.stage.getClass.getName.stripSuffix("$")) + new OptionalInstrumentation(Some(instr), instr.prefix) } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org