This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 84345c7 [SPARK-30662][ML][PYSPARK] Put back the API changes for HasBlockSize in ALS/MLP 84345c7 is described below commit 84345c7e67c9dfd47ec76d5a3d2ad76b62f959b6 Author: Huaxin Gao <huax...@us.ibm.com> AuthorDate: Sun Feb 9 13:14:30 2020 +0800 [SPARK-30662][ML][PYSPARK] Put back the API changes for HasBlockSize in ALS/MLP ### What changes were proposed in this pull request? Add ```HasBlockSize``` in shared Params in both Scala and Python. Make ALS/MLP extend ```HasBlockSize``` ### Why are the changes needed? Add ```HasBlockSize ``` in ALS, so user can specify the blockSize. Make ```HasBlockSize``` a shared param so both ALS and MLP can use it. ### Does this PR introduce any user-facing change? Yes ```ALS.setBlockSize/getBlockSize``` ```ALSModel.setBlockSize/getBlockSize``` ### How was this patch tested? Manually tested. Also added doctest. Closes #27501 from huaxingao/spark_30662. Authored-by: Huaxin Gao <huax...@us.ibm.com> Signed-off-by: zhengruifeng <ruife...@foxmail.com> --- .../MultilayerPerceptronClassifier.scala | 22 +---------- .../ml/param/shared/SharedParamsCodeGen.scala | 6 ++- .../spark/ml/param/shared/sharedParams.scala | 17 ++++++++ .../org/apache/spark/ml/recommendation/ALS.scala | 46 ++++++++++++++++------ python/pyspark/ml/classification.py | 22 ++++------- python/pyspark/ml/param/_shared_params_code_gen.py | 5 ++- python/pyspark/ml/param/shared.py | 17 ++++++++ python/pyspark/ml/recommendation.py | 29 +++++++++++--- 8 files changed, 109 insertions(+), 55 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index c7a8237..6e8f92b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion /** Params for Multilayer Perceptron. */ private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams - with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver { + with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize { import MultilayerPerceptronClassifier._ @@ -55,26 +55,6 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl final def getLayers: Array[Int] = $(layers) /** - * Block size for stacking input data in matrices to speed up the computation. - * Data is stacked within partitions. If block size is more than remaining data in - * a partition then it is adjusted to the size of this data. - * Recommended size is between 10 and 1000. - * Default: 128 - * - * @group expertParam - */ - @Since("1.5.0") - final val blockSize: IntParam = new IntParam(this, "blockSize", - "Block size for stacking input data in matrices. Data is stacked within partitions." + - " If block size is more than remaining data in a partition then " + - "it is adjusted to the size of this data. Recommended size is between 10 and 1000", - ParamValidators.gt(0)) - - /** @group expertGetParam */ - @Since("1.5.0") - final def getBlockSize: Int = $(blockSize) - - /** * The solver algorithm for optimization. * Supported options: "gd" (minibatch gradient descent) or "l-bfgs". * Default: "l-bfgs" diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 7ac680e..6194dfa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -104,7 +104,11 @@ private[shared] object SharedParamsCodeGen { isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"), ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " + "each row is for training or for validation. False indicates training; true indicates " + - "validation.") + "validation."), + ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " + + "stacked within partitions. If block size is more than remaining data in a partition " + + "then it is adjusted to the size of this data.", + isValid = "ParamValidators.gt(0)", isExpertParam = true) ) val code = genSharedParams(params) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 44c993e..0c0d2b5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -578,4 +578,21 @@ trait HasValidationIndicatorCol extends Params { /** @group getParam */ final def getValidationIndicatorCol: String = $(validationIndicatorCol) } + +/** + * Trait for shared param blockSize. This trait may be changed or + * removed between minor versions. + */ +@DeveloperApi +trait HasBlockSize extends Params { + + /** + * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.. + * @group expertParam + */ + final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", ParamValidators.gt(0)) + + /** @group expertGetParam */ + final def getBlockSize: Int = $(blockSize) +} // scalastyle:on diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 2fb9a27..002146f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -54,7 +54,8 @@ import org.apache.spark.util.random.XORShiftRandom /** * Common params for ALS and ALSModel. */ -private[recommendation] trait ALSModelParams extends Params with HasPredictionCol { +private[recommendation] trait ALSModelParams extends Params with HasPredictionCol + with HasBlockSize { /** * Param for the column name for user ids. Ids must be integers. Other * numeric types are supported for this column, but will be cast to integers as long as they @@ -125,6 +126,8 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo /** @group expertGetParam */ def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT) + + setDefault(blockSize -> 4096) } /** @@ -288,6 +291,15 @@ class ALSModel private[ml] ( @Since("2.2.0") def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value) + /** + * Set block size for stacking input data in matrices. + * Default is 4096. + * + * @group expertSetParam + */ + @Since("3.0.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) => if (featuresA != null && featuresB != null) { var dotProduct = 0.0f @@ -351,7 +363,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllUsers(numItems: Int): DataFrame = { - recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems) + recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize)) } /** @@ -366,7 +378,7 @@ class ALSModel private[ml] ( @Since("2.3.0") def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = { val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol)) - recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems) + recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize)) } /** @@ -377,7 +389,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllItems(numUsers: Int): DataFrame = { - recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers) + recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize)) } /** @@ -392,7 +404,7 @@ class ALSModel private[ml] ( @Since("2.3.0") def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = { val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol)) - recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers) + recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize)) } /** @@ -441,11 +453,12 @@ class ALSModel private[ml] ( dstFactors: DataFrame, srcOutputColumn: String, dstOutputColumn: String, - num: Int): DataFrame = { + num: Int, + blockSize: Int): DataFrame = { import srcFactors.sparkSession.implicits._ - val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])]) - val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])]) + val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) + val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])] .flatMap { case (srcIter, dstIter) => @@ -483,11 +496,10 @@ class ALSModel private[ml] ( /** * Blockifies factors to improve the efficiency of cross join - * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( factors: Dataset[(Int, Array[Float])], - blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = { + blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = { import factors.sparkSession.implicits._ factors.mapPartitions(_.grouped(blockSize)) } @@ -655,6 +667,15 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value) /** + * Set block size for stacking input data in matrices. + * Default is 4096. + * + * @group expertSetParam + */ + @Since("3.0.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + + /** * Sets both numUserBlocks and numItemBlocks to the specific value. * * @group setParam @@ -683,7 +704,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] instr.logDataset(dataset) instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol, itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval, - seed, intermediateStorageLevel, finalStorageLevel) + seed, intermediateStorageLevel, finalStorageLevel, blockSize) val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank), numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks), @@ -694,7 +715,8 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] checkpointInterval = $(checkpointInterval), seed = $(seed)) val userDF = userFactors.toDF("id", "features") val itemDF = itemFactors.toDF("id", "features") - val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this) + val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize)) + .setParent(this) copyValues(model) } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 5ab8e60..1436b78 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -2153,7 +2153,7 @@ class NaiveBayesModel(JavaProbabilisticClassificationModel, _NaiveBayesParams, J class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter, - HasTol, HasStepSize, HasSolver): + HasTol, HasStepSize, HasSolver, HasBlockSize): """ Params for :py:class:`MultilayerPerceptronClassifier`. @@ -2164,11 +2164,6 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H "E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " + "neurons and output layer of 10 neurons.", typeConverter=TypeConverters.toListInt) - blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " + - "matrices. Data is stacked within partitions. If block size is more than " + - "remaining data in a partition then it is adjusted to the size of this " + - "data. Recommended size is between 10 and 1000, default is 128.", - typeConverter=TypeConverters.toInt) solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " + "options: l-bfgs, gd.", typeConverter=TypeConverters.toString) initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.", @@ -2181,13 +2176,6 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H """ return self.getOrDefault(self.layers) - @since("1.6.0") - def getBlockSize(self): - """ - Gets the value of blockSize or its default value. - """ - return self.getOrDefault(self.blockSize) - @since("2.0.0") def getInitialWeights(self): """ @@ -2211,11 +2199,17 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer ... (1.0, Vectors.dense([0.0, 1.0])), ... (1.0, Vectors.dense([1.0, 0.0])), ... (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"]) - >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123) + >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123) >>> mlp.setMaxIter(100) MultilayerPerceptronClassifier... >>> mlp.getMaxIter() 100 + >>> mlp.getBlockSize() + 128 + >>> mlp.setBlockSize(1) + MultilayerPerceptronClassifier... + >>> mlp.getBlockSize() + 1 >>> model = mlp.fit(df) >>> model.setFeaturesCol("features") MultilayerPerceptronClassificationModel... diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index ded3ca8..2086e83 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -164,7 +164,10 @@ if __name__ == "__main__": "'euclidean'", "TypeConverters.toString"), ("validationIndicatorCol", "name of the column that indicates whether each row is for " + "training or for validation. False indicates training; true indicates validation.", - None, "TypeConverters.toString")] + None, "TypeConverters.toString"), + ("blockSize", "block size for stacking input data in matrices. Data is stacked within " + "partitions. If block size is more than remaining data in a partition then it is " + "adjusted to the size of this data.", None, "TypeConverters.toInt")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 8fc1156..24fb0d3 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -580,3 +580,20 @@ class HasValidationIndicatorCol(Params): Gets the value of validationIndicatorCol or its default value. """ return self.getOrDefault(self.validationIndicatorCol) + + +class HasBlockSize(Params): + """ + Mixin for param blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. + """ + + blockSize = Param(Params._dummy(), "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", typeConverter=TypeConverters.toInt) + + def __init__(self): + super(HasBlockSize, self).__init__() + + def getBlockSize(self): + """ + Gets the value of blockSize or its default value. + """ + return self.getOrDefault(self.blockSize) diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py index ee27696..99d80aa 100644 --- a/python/pyspark/ml/recommendation.py +++ b/python/pyspark/ml/recommendation.py @@ -28,7 +28,7 @@ __all__ = ['ALS', 'ALSModel'] @inherit_doc -class _ALSModelParams(HasPredictionCol): +class _ALSModelParams(HasPredictionCol, HasBlockSize): """ Params for :py:class:`ALS` and :py:class:`ALSModel`. @@ -223,6 +223,8 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable): 0.1 >>> als.clear(als.regParam) >>> model = als.fit(df) + >>> model.getBlockSize() + 4096 >>> model.getUserCol() 'user' >>> model.setUserCol("user") @@ -282,13 +284,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable): implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"): + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096): """ __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \ ratingCol="rating", nonnegative=false, checkpointInterval=10, \ intermediateStorageLevel="MEMORY_AND_DISK", \ - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096) """ super(ALS, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid) @@ -296,7 +298,8 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable): implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", + blockSize=4096) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -306,13 +309,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable): implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"): + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096): """ setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \ ratingCol="rating", nonnegative=False, checkpointInterval=10, \ intermediateStorageLevel="MEMORY_AND_DISK", \ - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096) Sets params for ALS. """ kwargs = self._input_kwargs @@ -443,6 +446,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable): """ return self._set(seed=value) + @since("3.0.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable): """ @@ -479,6 +489,13 @@ class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable): """ return self._set(predictionCol=value) + @since("3.0.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + @property @since("1.4.0") def rank(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org