[GitHub] flink pull request: [FLINK-3128] [flink-ml] Add Isotonic Regressio...

2016-02-06 Thread f-sander
Github user f-sander commented on the pull request:

https://github.com/apache/flink/pull/1565#issuecomment-180798486
  
Sorry for the long delay. I still don't really have time for this, but I 
wan't to describe it anyways. That's why the writing and formatting is pretty 
sloppy in this. Sorry for that, I hope you bare with me:

We only consider isotonic regression on weighted, two dimensional data. 
Thus, datapoints are tuples of three doubles: `(y, x, w)`.

PAV assumes the data to be sorted by `x`. It starts on the left and goes to 
the right. Whenever two Point's (or more) are found that are descending in 
order of `x`, it "pools" them, which means all `y` values (multiplied by their 
weight) in that pool are averaged by the sum of all weights. Any point in the 
pool then looks like this: `(y_weighted_pool_avg, x, w)`. Because the `y` 
values where changed, we have to look back in `x` order if the new pool avg is 
lower than the value before the pool. If that's the case, we have to pool again 
until now higher `y` value is present before the pool.

Any sequence of data points from `i` to `j` sharing the same `y` value is 
compressed in the following way: `(y, x_i, sum_of_weights), (y, x_j, 0)`. The 
hope of Sparks implementation is that enough data gets compressed that way, 
that all remaining data fits into one node in the last step. However, there are 
of course cases, where this simply doesn't work.

Our approach (not implemented in this PR) works like this:
```
compare two consecutive data points i and j:
if y_i < y_j, leave them untouched
if y_i > y_j, replace both with ((y_i * w_i + y_j * w_j) / (w_i + w_j), 
x_i, w_i + w_j). Also remember x_j
if y_i = y_j, replace both with (y_i, x_i, w_i + w_j). Also remember x_j
Repeat that until no pairs are combined to one
```
After the iteration terminated: Foreach point that has a "remembered" 
`x_j`, add another `(y, x_j, 0)` directly behind it.

We are able to compare each point with its successor, by attaching each 
point with an index (zipWithIndex) and a "next-pointer" (index+1) and then 
doing a:
`set.join(set).where(next).equalTo(index)`
However, because of the weight summation, we must avoid that a point 
appears in multiple join pairs. Otherwise a point's weight might be summed into 
multiple combined points.

We worked around that by doing two joins in each iteration step:
```
step 1: left join side has only points with even indices, right side only 
with odd
step 2: left join side has only points with odd indices, right side only 
with even
if nothing happened during these two runs, we are done
```

Unfortunately, because of the merging the indices are not incrementing by 1 
anymore. That's why we wanted to apply another zipWithIndex after the two 
joins, but the join repartitioned the data, so we loose our range-partitioning. 
But, this is required to get indices representing the total order of the data.

I hope you can understand the problem. Again sorry for sloppy writing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3128] [flink-ml] Add Isotonic Regressio...

2016-02-03 Thread f-sander
Github user f-sander commented on the pull request:

https://github.com/apache/flink/pull/1565#issuecomment-179177999
  
Thanks for your Feedback!

Yes, scalability is the main issue for us too. We are not aware of any 
other parallel implementation 
he main issue for us too. We also talked to the original author of Spark's 
IR implementation (which is equivalent too ours) about this with the same 
result. However, we think we have a theoretical approach to solving this, but 
it depends on the self join without duplicates. Remember our discussion on the 
user-mailing list with subject `join with no element appearing in multiple 
join-pairs`? I need that for this.

I will link a sketch to our algorithm design here in a few days, If we 
haven't found a way to solve this. I guess IR won't make it into Flink without 
a fully parallelized way? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3128] [flink-ml] Add Isotonic Regressio...

2016-02-03 Thread f-sander
Github user f-sander commented on the pull request:

https://github.com/apache/flink/pull/1565#issuecomment-179247125
  
There is one advantage of this over using a single-node ML-Lib: This 
implementation contains the compression procedure used in Spark that combines 
data points with equal label. The hope of this parallelization strategy is, 
that in each partition enough points are compressed so that the combined 
dataset in the last step fits into one node.

I will try to outline our algorithm tonight, but I'm very busy right now 
and can't promise. But I'll try.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3128] [flink-ml] Add Isotonic Regressio...

2016-02-03 Thread f-sander
Github user f-sander commented on a diff in the pull request:

https://github.com/apache/flink/pull/1565#discussion_r51723047
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/IsotonicRegression.scala
 ---
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.regression
+
+import java.util.Arrays.binarySearch
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, 
Predictor}
+
+import scala.collection.mutable.ArrayBuffer
+
+case class IsotonicRegressionModel(boundaries: Array[Double], predictions: 
Array[Double])
+
+/**
+  * Isotonic regression.
+  * Currently implemented using parallelized pool adjacent violators 
algorithm.
+  * Only univariate (single feature) algorithm supported.
+  *
+  * Sequential PAV implementation based on:
+  * Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
+  * "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
+  * Available from [[http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf]]
+  *
+  * Sequential PAV parallelization based on:
+  * Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
+  * "An approach to parallelizing isotonic regression."
+  * Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 
141-147.
+  * Available from 
[[http://softlib.rice.edu/pub/CRPC-TRs/reports/CRPC-TR96640.pdf]]
+  *
+  * @see [[http://en.wikipedia.org/wiki/Isotonic_regression Isotonic 
regression (Wikipedia)]]
+  *
+  *  This is a port from the implementation in Apache Spark.
+  * @example
+  * {{{
+  * val ir = IsotonicRegression()
+  *   .setIsotonic(true)
+  *
+  * val trainingDS: DataSet[(Double,Double,Double)] = ...
+  * val testingDS: DataSet[(Double)] = ...
+  *
+  * mlr.fit(trainingDS)
+  *
+  * val predictions = mlr.predict(testingDS)
+  *  }}}
+  *
+  * =Parameters=
+  *
+  * - [[org.apache.flink.ml.regression.IsotonicRegression.Isotonic]]:
+  * true if labels shall be ascending, false if labels shall be descending.
+  *
+  */
+class IsotonicRegression extends Predictor[IsotonicRegression] {
+
+  var isotonic = true
+
+  var model: Option[DataSet[IsotonicRegressionModel]] = None
+
+  def setIsotonic(isotonic: Boolean): this.type = {
+this.isotonic = isotonic
+this
+  }
+
+}
+
+object IsotonicRegression {
+
+  // == Parameters 
===
+
+  case object Isotonic extends Parameter[Boolean] {
+val defaultValue = Some(true)
+  }
+
+  //  Factory methods 

+
+  def apply(): IsotonicRegression = {
+new IsotonicRegression()
+  }
+
+  // == Operations 
===
+
+  class AdjacentPoolViolatersMapper extends MapFunction[Array[(Double, 
Double, Double)], Array[
+(Double, Double, Double)]] {
+
+/**
+  * Performs a pool adjacent violators algorithm (PAV).
+  * Uses approach with single processing of data where violators
+  * in previously processed data created by pooling are fixed 
immediately.
+  * Uses optimization of discovering monotonicity violating sequences 
(blocks).
+  *
+  * @param input Input data of tuples (label, feature, weight).
+  * @return Result tuples (label, feature, weight) where labels were 
updated
+  * to form a monotone sequence as per isotonic regression 
definit

[GitHub] flink pull request: [FLINK-3128] [flink-ml] Add Isotonic Regressio...

2016-02-02 Thread f-sander
Github user f-sander commented on the pull request:

https://github.com/apache/flink/pull/1565#issuecomment-178487703
  
Are the build failures related to us? I don't really understand how...

The first failure happens in oraclejd8 with hadoop 2.7.1:
```
Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 351.368 sec 
<<< FAILURE! - in 
org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase

testTaskManagerProcessFailure[0](org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase)
  Time elapsed: 318.792 sec  <<< FAILURE!
java.lang.AssertionError: The program did not finish in time
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertFalse(Assert.java:64)
at 
org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest.testTaskManagerProcessFailure(AbstractTaskManagerProcessFailureRecoveryTest.java:212)
```
The second on in openjdk 7 with hadoop 1 appears to experience a deadlock 
(?):
```

==
Maven produced no output for 300 seconds.

==

==
The following Java processes are running (JPS)

==
2286 Launcher
77113 Jps
76276 surefirebooter4006285424712115006.jar

==
Printing stack trace of Java process 2286

==
```
After that only lots of lots of process stack traces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3128] [flink-ml] Add Isotonic Regressio...

2016-01-31 Thread f-sander
GitHub user f-sander opened a pull request:

https://github.com/apache/flink/pull/1565

[FLINK-3128] [flink-ml] Add Isotonic Regression To ML Library

Adds isotonic regression to the ml library.
It's a port of the implementation in Apache Spark.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/f-sander/flink flink3128

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1565.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1565


commit 227f91560336b860f2f982a791ae257b1a74ded8
Author: f-sander <fsan...@mailbox.tu-berlin.de>
Date:   2015-12-07T14:32:16Z

IR fit implementation

commit 7b09de3e6eac3dbe0dbaeff0c313dda821ee4ffa
Author: f-sander <fsan...@mailbox.tu-berlin.de>
Date:   2015-12-14T15:21:54Z

finished predict and fit. validated against spark

commit 0bd72afe3f0440f59fd05a8ec1265ebd4296e7cc
Author: f-sander <fsan...@mailbox.tu-berlin.de>
Date:   2016-01-09T13:51:14Z

Merge branch 'master' into flink3128

commit 6cd01a8db149c2ce06f08251c3ca6e8bef62d4c5
Author: f-sander <fsan...@mailbox.tu-berlin.de>
Date:   2016-01-10T15:49:34Z

making use of new range partitioning + implementation of antitonic 
regression

commit bcfd3b3a386459c4170fb7552e9a7af23f38c740
Author: f-sander <fsan...@mailbox.tu-berlin.de>
Date:   2016-01-24T15:32:47Z

added test cases

commit 370b68dfda734af52f978b9882acab30a72c5100
Author: Nik Hille <nik...@krutt.org>
Date:   2016-01-25T17:57:29Z

Add more IR integration tests

commit fd075ac1a9fd3d8d3eafb0fc49b62f32312df343
Author: Nik Hille <nik...@krutt.org>
Date:   2016-01-31T13:36:03Z

Fix weight bug in runIsotonicRegression()

commit 0e33df3fee74e28a65222fb31326328997da5fad
Author: f-sander <fsan...@mailbox.tu-berlin.de>
Date:   2016-01-31T13:40:20Z

cleanup

commit 990f49e935d6e40e7a2dd4e5071c83294a10c6f7
Author: Nik Hille <nik...@krutt.org>
Date:   2016-01-31T13:41:49Z

Remove unused import

commit a4776a5163c7f8621ade3d95ee733f76a79b9915
Author: Nik Hille <nik...@krutt.org>
Date:   2016-01-31T13:41:56Z

Merge branch 'flink3128' of https://github.com/f-sander/flink into flink3128

commit c5b09151a5e75a98008e1dacca48bfd38d208353
Author: f-sander <fsan...@mailbox.tu-berlin.de>
Date:   2016-01-31T13:58:12Z

    javadoc

commit 92420bec60745f6a78ee470dd9644fa4991befdd
Author: f-sander <fsan...@mailbox.tu-berlin.de>
Date:   2016-01-31T14:00:01Z

Merge branch 'master' into flink3128

commit 4ab6f5f3ee770728fc177bce454b788adb694b6a
Author: f-sander <fsan...@mailbox.tu-berlin.de>
Date:   2016-01-31T14:08:11Z

moved isotonic regression into new lib

commit dd74aeb6d501361d052798d6f461d167e0472686
Author: f-sander <fsan...@mailbox.tu-berlin.de>
Date:   2016-01-31T15:22:01Z

[FLINK-3128] Add Isotonic Regression To ML Library




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---