Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/15018#discussion_r92920879
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
---
@@ -328,74 +336,68 @@ class IsotonicRegression private (private var
isotonic: Boolean) extends Seriali
return Array.empty
}
- // Pools sub array within given bounds assigning weighted average
value to all elements.
- def pool(input: Array[(Double, Double, Double)], start: Int, end:
Int): Unit = {
- val poolSubArray = input.slice(start, end + 1)
-
- val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
- val weight = poolSubArray.map(_._3).sum
-
- var i = start
- while (i <= end) {
- input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
- i = i + 1
- }
+ /*
+ Keeps track of the start and end indices of the blocks.
blockBounds(start) gives the
+ index of the end of the block and blockBounds(end) gives the index of
the start of the
+ block. Entries that are not the start or end of the block are
meaningless.
+ */
+ val blockBounds = Array.range(0, input.length) // Initially, each data
point is its own block
+
+ /*
+ Keep track of the sum of weights and sum of weight * y for each block.
weights(start)
+ gives the values for the block. Entries that are not at the start of a
block
+ are meaningless.
+ */
+ val weights: Array[(Double, Double)] = input.map(x => (x._3, x._3 *
x._1)) // (weight, weight * y)
+
+ // a few convenience functions to make the code more readable
+ def blockEnd(start: Int) = blockBounds(start)
+ def blockStart(end: Int) = blockBounds(end)
+ def nextBlock(start: Int) = blockEnd(start) + 1
+ def prevBlock(start: Int) = blockStart(start - 1)
+ def merge(block1: Int, block2: Int): Int = {
+ assert(blockEnd(block1) + 1 == block2, "attempting to merge
non-consecutive blocks")
+ blockBounds(block1) = blockEnd(block2)
+ blockBounds(blockEnd(block2)) = block1
+ val w1 = weights(block1)
+ val w2 = weights(block2)
+ weights(block1) = (w1._1 + w2._1, w1._2 + w2._2)
+ block1
}
+ def average(start: Int) = weights(start)._2 / weights(start)._1
+ /*
+ Implement Algorithm PAV from [3].
+ Merge on >= instead of > because it elimnate adjacent blocks with the
same average, and we want
+ to compress our output as much as possible. Both give correct results.
+ */
var i = 0
- val len = input.length
- while (i < len) {
- var j = i
-
- // Find monotonicity violating sequence, if any.
- while (j < len - 1 && input(j)._1 > input(j + 1)._1) {
- j = j + 1
- }
-
- // If monotonicity was not violated, move to next data point.
- if (i == j) {
- i = i + 1
- } else {
- // Otherwise pool the violating sequence
- // and check if pooling caused monotonicity violation in
previously processed points.
- while (i >= 0 && input(i)._1 > input(i + 1)._1) {
- pool(input, i, j)
- i = i - 1
+ while (nextBlock(i) < input.length) {
+ if (average(i) >= average(nextBlock(i))) {
+ merge(i, nextBlock(i))
+ while((i > 0) && (average(prevBlock(i)) >= average(i))) {
--- End diff --
Super nit: space before `while` here and below. You can probably nix the
extra parens around each of the two terms, but no big deal
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]