This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f92c827acab [SPARK-41008][MLLIB] Follow-up isotonic regression features deduplica… f92c827acab is described below commit f92c827acabccf547d5a1dff4f7ec371bc370230 Author: Ahmed Mahran <ahmed.mah...@mashin.io> AuthorDate: Sun Dec 11 15:01:15 2022 -0600 [SPARK-41008][MLLIB] Follow-up isotonic regression features deduplica… ### What changes were proposed in this pull request? A follow-up on https://github.com/apache/spark/pull/38966 to update relevant documentation and remove redundant sort key. ### Why are the changes needed? For isotonic regression, another method for breaking ties of repeated features was introduced in https://github.com/apache/spark/pull/38966. This will aggregate points having the same feature value by computing the weighted average of the labels. - This only requires points to be sorted by features instead of features and labels. So, we should remove label as a secondary sorting key. - Isotonic regression documentation needs to be updated to reflect the new behavior. ### Does this PR introduce _any_ user-facing change? Isotonic regression documentation update. The documentation described the behavior of the algorithm when there are points in the input with repeated features. Since this behavior has changed, documentation needs to describe the new behavior. ### How was this patch tested? Existing tests passed. No need to add new tests since existing tests are already comprehensive. srowen Closes #38996 from ahmed-mahran/ml-isotonic-reg-dups-follow-up. Authored-by: Ahmed Mahran <ahmed.mah...@mashin.io> Signed-off-by: Sean Owen <sro...@gmail.com> --- docs/mllib-isotonic-regression.md | 18 ++--- .../mllib/regression/IsotonicRegression.scala | 82 ++++++++++------------ .../mllib/regression/IsotonicRegressionSuite.scala | 29 +++++--- 3 files changed, 67 insertions(+), 62 deletions(-) diff --git a/docs/mllib-isotonic-regression.md b/docs/mllib-isotonic-regression.md index 95be32a819e..711e828bd80 100644 --- a/docs/mllib-isotonic-regression.md +++ b/docs/mllib-isotonic-regression.md @@ -43,7 +43,14 @@ best fitting the original data points. which uses an approach to [parallelizing isotonic regression](https://doi.org/10.1007/978-3-642-99789-1_10). The training input is an RDD of tuples of three double values that represent -label, feature and weight in this order. Additionally, IsotonicRegression algorithm has one +label, feature and weight in this order. In case there are multiple tuples with +the same feature then these tuples are aggregated into a single tuple as follows: + +* Aggregated label is the weighted average of all labels. +* Aggregated feature is the unique feature value. +* Aggregated weight is the sum of all weights. + +Additionally, IsotonicRegression algorithm has one optional parameter called $isotonic$ defaulting to true. This argument specifies if the isotonic regression is isotonic (monotonically increasing) or antitonic (monotonically decreasing). @@ -53,17 +60,12 @@ labels for both known and unknown features. The result of isotonic regression is treated as piecewise linear function. The rules for prediction therefore are: * If the prediction input exactly matches a training feature - then associated prediction is returned. In case there are multiple predictions with the same - feature then one of them is returned. Which one is undefined - (same as java.util.Arrays.binarySearch). + then associated prediction is returned. * If the prediction input is lower or higher than all training features then prediction with lowest or highest feature is returned respectively. - In case there are multiple predictions with the same feature - then the lowest or highest is returned respectively. * If the prediction input falls between two training features then prediction is treated as piecewise linear function and interpolated value is calculated from the - predictions of the two closest features. In case there are multiple values - with the same feature then the same rules as in previous point are used. + predictions of the two closest features. ### Examples diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 0b2bf147501..fbf0dc9c357 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -23,7 +23,6 @@ import java.util.Arrays.binarySearch import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.commons.math3.util.Precision import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -272,8 +271,8 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali * @param input RDD of tuples (label, feature, weight) where label is dependent variable * for which we calculate isotonic regression, feature is independent variable * and weight represents number of measures with default 1. - * If multiple labels share the same feature value then they are ordered before - * the algorithm is executed. + * If multiple labels share the same feature value then they are aggregated using + * the weighted average before the algorithm is executed. * @return Isotonic regression model. */ @Since("1.3.0") @@ -298,8 +297,8 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali * @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable * for which we calculate isotonic regression, feature is independent variable * and weight represents number of measures with default 1. - * If multiple labels share the same feature value then they are ordered before - * the algorithm is executed. + * If multiple labels share the same feature value then they are aggregated using + * the weighted average before the algorithm is executed. * @return Isotonic regression model. */ @Since("1.3.0") @@ -310,21 +309,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali /** * Aggregates points of duplicate feature values into a single point using as label the weighted * average of the labels of the points with duplicate feature values. All points for a unique - * feature values are aggregated as: + * feature value are aggregated as: * - * - Aggregated label is the weighted average of all labels - * - Aggregated feature is the weighted average of all equal features[1] - * - Aggregated weight is the sum of all weights + * - Aggregated label is the weighted average of all labels. + * - Aggregated feature is the unique feature value. + * - Aggregated weight is the sum of all weights. * - * [1] Note: It is possible that feature values to be equal up to a resolution due to - * representation errors, since we cannot know which feature value to use in that case, we - * compute the weighted average of the features. Ideally, all feature values will be equal and - * the weighted average is just the value at any point. - * - * @param input - * Input data of tuples (label, feature, weight). Weights must be non-negative. - * @return - * Points with unique feature values. + * @param input Input data of tuples (label, feature, weight). Weights must be non-negative. + * @return Points with unique feature values. */ private[regression] def makeUnique( input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { @@ -339,28 +331,28 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali if (cleanInput.length <= 1) { cleanInput } else { - // whether or not two double features are equal up to a precision - @inline def areEqual(a: Double, b: Double): Boolean = Precision.equals(a, b) - val pointsAccumulator = new IsotonicRegression.PointsAccumulator - var (_, prevFeature, _) = cleanInput.head - - // Go through input points, merging all points with approximately equal feature values into - // a single point. Equality of features is defined by areEqual method. The label of the - // accumulated points is the weighted average of the labels of all points of equal feature - // value. It is possible that feature values to be equal up to a resolution due to - // representation errors, since we cannot know which feature value to use in that case, - // we compute the weighted average of the features. - cleanInput.foreach { case point @ (_, feature, _) => - if (areEqual(feature, prevFeature)) { + + // Go through input points, merging all points with equal feature values into a single point. + // Equality of features is defined by shouldAccumulate method. The label of the accumulated + // points is the weighted average of the labels of all points of equal feature value. + + // Initialize with first point + pointsAccumulator := cleanInput.head + // Accumulate the rest + cleanInput.tail.foreach { case point @ (_, feature, _) => + if (pointsAccumulator.shouldAccumulate(feature)) { + // Still on a duplicate feature, accumulate pointsAccumulator += point } else { + // A new unique feature encountered: + // - append the last accumulated point to unique features output pointsAccumulator.appendToOutput() + // - and reset pointsAccumulator := point } - prevFeature = feature } - // Append the last accumulated point + // Append the last accumulated point to unique features output pointsAccumulator.appendToOutput() pointsAccumulator.getOutput } @@ -488,14 +480,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali // Points with same or adjacent features must collocate within the same partition. .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput)) .values - // Lexicographically sort points by features then labels. - .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1)))) + // Lexicographically sort points by features. + .mapPartitions(p => Iterator(p.toArray.sortBy(_._2))) // Aggregate points with equal features into a single point. .map(makeUnique) .flatMap(poolAdjacentViolators) .collect() // Sort again because collect() doesn't promise ordering. - .sortBy(x => (x._2, x._1)) + .sortBy(_._2) poolAdjacentViolators(parallelStepResult) } } @@ -511,30 +503,32 @@ object IsotonicRegression { private var (currentLabel: Double, currentFeature: Double, currentWeight: Double) = (0d, 0d, 0d) + /** Whether or not this feature exactly equals the current accumulated feature. */ + @inline def shouldAccumulate(feature: Double): Boolean = currentFeature == feature + /** Resets the current value of the point accumulator using the provided point. */ - def :=(point: (Double, Double, Double)): Unit = { + @inline def :=(point: (Double, Double, Double)): Unit = { val (label, feature, weight) = point currentLabel = label * weight - currentFeature = feature * weight + currentFeature = feature currentWeight = weight } /** Accumulates the provided point into the current value of the point accumulator. */ - def +=(point: (Double, Double, Double)): Unit = { - val (label, feature, weight) = point + @inline def +=(point: (Double, Double, Double)): Unit = { + val (label, _, weight) = point currentLabel += label * weight - currentFeature += feature * weight currentWeight += weight } /** Appends the current value of the point accumulator to the output. */ - def appendToOutput(): Unit = + @inline def appendToOutput(): Unit = output += (( currentLabel / currentWeight, - currentFeature / currentWeight, + currentFeature, currentWeight)) /** Returns all accumulated points so far. */ - def getOutput: Array[(Double, Double, Double)] = output.toArray + @inline def getOutput: Array[(Double, Double, Double)] = output.toArray } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index b59d16be6cd..a206e922e5f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.mllib.regression -import org.apache.commons.math3.util.Precision import org.scalatest.matchers.must.Matchers import org.apache.spark.{SparkException, SparkFunSuite} @@ -225,12 +224,18 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w test("SPARK-41008 isotonic regression with duplicate features differs from sklearn") { val model = runIsotonicRegressionOnInput( - Seq((2, 1, 1), (1, 1, 1), (0, 2, 1), (1, 2, 1), (0.5, 3, 1), (0, 3, 1)), + Seq((1, 0.6, 1), (0, 0.6, 1), + (0, 1.0 / 3, 1), (1, 1.0 / 3, 1), (0, 1.0 / 3, 1), + (1, 0.2, 1), (0, 0.2, 1), (0, 0.2, 1), (0, 0.2, 1)), true, 2) - assert(model.boundaries === Array(1.0, 3.0)) - assert(model.predictions === Array(0.75, 0.75)) + assert(model.boundaries === Array(0.2, 1.0 / 3, 0.6)) + assert(model.predictions === Array(0.25, 1.0 / 3, 0.5)) + + assert(model.predict(0.6) === 0.5) + assert(model.predict(1.0 / 3) === 1.0 / 3) + assert(model.predict(0.2) === 0.25) } test("isotonic regression prediction") { @@ -327,9 +332,8 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w test("makeUnique: handle duplicate features") { val regressor = new IsotonicRegression() import regressor.makeUnique - import Precision.EPSILON - // Note: input must be lexicographically sorted by (feature, label) + // Note: input must be lexicographically sorted by feature // empty assert(makeUnique(Array.empty) === Array.empty) @@ -373,9 +377,14 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w (10.0 * 0.3 + 20.0 * 0.3 + 30.0 * 0.4, 2.0, 1.0), (10.0, 3.0, 1.0))) - // duplicate up to resolution error - assert( - makeUnique(Array((1.0, 1.0, 1.0), (1.0, 1.0 - EPSILON, 1.0), (1.0, 1.0 + EPSILON, 1.0))) === - Array((1.0, 1.0, 3.0))) + // don't handle tiny representation errors + // e.g. infinitely adjacent doubles are already unique + val adjacentDoubles = { + // i-th next representable double to 1.0 is java.lang.Double.longBitsToDouble(base + i) + val base = java.lang.Double.doubleToRawLongBits(1.0) + (0 until 10).map(i => java.lang.Double.longBitsToDouble(base + i)) + .map((1.0, _, 1.0)).toArray + } + assert(makeUnique(adjacentDoubles) === adjacentDoubles) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org