[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-29 Thread mbalassi
Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r90032424
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -581,6 +637,16 @@ object ALS {
 val userXy = new ArrayBuffer[Array[Double]]()
 val numRatings = new ArrayBuffer[Int]()
 
+var precomputedXtX: Array[Double] = null
+
+override def open(config: Configuration): Unit = {
+  // retrieve broadcasted precomputed XtX if using implicit 
feedback
+  if (implicitPrefs) {
+precomputedXtX = 
getRuntimeContext.getBroadcastVariable[Array[Double]]("XtX")
+  .iterator().next()
+  }
+}
+
 override def coGroup(left: lang.Iterable[(Int, Int, 
Array[Array[Double]])],
--- End diff --

I agree with @gaborhermann here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-11 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87588061
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -675,7 +756,69 @@ object ALS {
   collector.collect((blockID, array))
 }
   }
-}.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+}
+
+// broadcasting XtX matrix in the implicit case
+val updatedFactorMatrix = if (implicitPrefs) {
+  newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
+} else {
+  newMatrix
+}
+
+
updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+  }
+
+  /**
+* Computes the XtX matrix for the implicit version before updating the 
factors.
+* This matrix is intended to be broadcast, but as we cannot use a sink 
inside a Flink
+* iteration, so we represent it as a [[DataSet]] with a single element 
containing the matrix.
+*
+* The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
+* then sums all these computed matrices to get `X^T * X`.
+*/
+  private[recommendation] def computeXtX(x: DataSet[(Int, 
Array[Array[Double]])], factors: Int):
+  DataSet[Array[Double]] = {
+val triangleSize = factors * (factors - 1) / 2 + factors
+
+type MtxBlock = (Int, Array[Array[Double]])
+// construct XtX for all blocks
+val xtx = x
+  .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
+var xtxForBlock: Array[Double] = null
+
+override def mapPartition(blocks: Iterable[(Int, 
Array[Array[Double]])],
+  out: Collector[Array[Double]]): Unit = {
+
+  if (xtxForBlock == null) {
+// creating the matrix if not yet created
+xtxForBlock = Array.fill(triangleSize)(0.0)
+  } else {
+// erasing the matrix
+var i = 0
+while (i < xtxForBlock.length) {
--- End diff --

Okay, you're right. I modified the code accordingly.

In this case the decision seems straightforward, but when it's not that 
clear, I agree we should do profiling. Then we could see if we could make the 
profiling easy to use. For now let's just keep this in mind.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-11 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87587473
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -273,6 +308,14 @@ object ALS {
 val defaultValue: Option[Int] = Some(10)
   }
 
+  case object ImplicitPrefs extends Parameter[Boolean] {
--- End diff --

I suggest to leave it at 10 and do not give any advise to use highest 
possible number. It should work out-of-the-box, and to get good results, the 
user must fit the parameters to the use-case (such as the lambda coefficient of 
the regularization). Btw. Spark uses 10 by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-10 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87422110
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -273,6 +308,14 @@ object ALS {
 val defaultValue: Option[Int] = Some(10)
   }
 
+  case object ImplicitPrefs extends Parameter[Boolean] {
--- End diff --

You are correct the recommendation is from the iALS paper, but I'm not sure 
if the same holds for ALS. I trust your judgment here, since I'm not as 
familiar with xALS as I should be to have a good intuition about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-10 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87421513
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -675,7 +756,69 @@ object ALS {
   collector.collect((blockID, array))
 }
   }
-}.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+}
+
+// broadcasting XtX matrix in the implicit case
+val updatedFactorMatrix = if (implicitPrefs) {
+  newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
+} else {
+  newMatrix
+}
+
+
updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+  }
+
+  /**
+* Computes the XtX matrix for the implicit version before updating the 
factors.
+* This matrix is intended to be broadcast, but as we cannot use a sink 
inside a Flink
+* iteration, so we represent it as a [[DataSet]] with a single element 
containing the matrix.
+*
+* The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
+* then sums all these computed matrices to get `X^T * X`.
+*/
+  private[recommendation] def computeXtX(x: DataSet[(Int, 
Array[Array[Double]])], factors: Int):
+  DataSet[Array[Double]] = {
+val triangleSize = factors * (factors - 1) / 2 + factors
+
+type MtxBlock = (Int, Array[Array[Double]])
+// construct XtX for all blocks
+val xtx = x
+  .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
+var xtxForBlock: Array[Double] = null
+
+override def mapPartition(blocks: Iterable[(Int, 
Array[Array[Double]])],
+  out: Collector[Array[Double]]): Unit = {
+
+  if (xtxForBlock == null) {
+// creating the matrix if not yet created
+xtxForBlock = Array.fill(triangleSize)(0.0)
+  } else {
+// erasing the matrix
+var i = 0
+while (i < xtxForBlock.length) {
--- End diff --

I don't imagine this making a major difference in performance, so let's 
just go with the cleaner code angle and use `fill`.

I wish we had an easy to use integrated way to do proper profiling so such 
decisions can be easier (i.e. if this is 0.5% of the CPU cost, then optimizing 
is pointless but right now we don't know)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-10 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87355415
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -675,7 +756,69 @@ object ALS {
   collector.collect((blockID, array))
 }
   }
-}.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+}
+
+// broadcasting XtX matrix in the implicit case
+val updatedFactorMatrix = if (implicitPrefs) {
+  newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
+} else {
+  newMatrix
+}
+
+
updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+  }
+
+  /**
+* Computes the XtX matrix for the implicit version before updating the 
factors.
+* This matrix is intended to be broadcast, but as we cannot use a sink 
inside a Flink
+* iteration, so we represent it as a [[DataSet]] with a single element 
containing the matrix.
+*
+* The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
+* then sums all these computed matrices to get `X^T * X`.
+*/
+  private[recommendation] def computeXtX(x: DataSet[(Int, 
Array[Array[Double]])], factors: Int):
+  DataSet[Array[Double]] = {
+val triangleSize = factors * (factors - 1) / 2 + factors
+
+type MtxBlock = (Int, Array[Array[Double]])
+// construct XtX for all blocks
+val xtx = x
+  .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
+var xtxForBlock: Array[Double] = null
+
+override def mapPartition(blocks: Iterable[(Int, 
Array[Array[Double]])],
+  out: Collector[Array[Double]]): Unit = {
+
+  if (xtxForBlock == null) {
+// creating the matrix if not yet created
+xtxForBlock = Array.fill(triangleSize)(0.0)
+  } else {
+// erasing the matrix
+var i = 0
+while (i < xtxForBlock.length) {
--- End diff --

I tried to avoid object creation, but I'm not sure if erasing works as 
well. By using `fill` a new `factors * factors` matrix would be created at 
every mapping. Am I right? Maybe that's not a big problem, as there is only one 
mapping for every partition, and the matrix is not that big. Maybe it was just 
premature optimization :) We could use fill, because that make the code 
cleaner. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-10 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87354805
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -535,8 +581,17 @@ object ALS {
 itemOut: DataSet[(Int, OutBlockInformation)],
 userIn: DataSet[(Int, InBlockInformation)],
 factors: Int,
-lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]):
+lambda: Double, blockIDPartitioner: FlinkPartitioner[Int],
+implicitPrefs: Boolean,
+alpha: Double):
   DataSet[(Int, Array[Array[Double]])] = {
+// retrieve broadcast XtX matrix in implicit case
+val XtXtoBroadcast = if (implicitPrefs) {
--- End diff --

I tried to fit the notation of the explicit ALS code, as it uses `userXtX` 
notation in the `updateFactors` function. I think it might be confusing to use 
two different notations is the code, even if the paper uses another notation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-10 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87353990
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -273,6 +308,14 @@ object ALS {
 val defaultValue: Option[Int] = Some(10)
   }
 
+  case object ImplicitPrefs extends Parameter[Boolean] {
--- End diff --

I also changed this, but note that this is for the explicit ALS algorithm 
too. Do you think it's okay to give this recommendation for explicit case too? 
Really high number of factors (without regularization) might lead to 
overfitting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-10 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87353332
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -156,6 +171,26 @@ class ALS extends Predictor[ALS] {
 this
   }
 
+  /** Sets the input observations to be implicit, thus using the iALS 
algorithm for learning.
--- End diff --

Good point. Changed the wording.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87202604
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -675,7 +756,69 @@ object ALS {
   collector.collect((blockID, array))
 }
   }
-}.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+}
+
+// broadcasting XtX matrix in the implicit case
+val updatedFactorMatrix = if (implicitPrefs) {
+  newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
+} else {
+  newMatrix
+}
+
+
updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+  }
+
+  /**
+* Computes the XtX matrix for the implicit version before updating the 
factors.
+* This matrix is intended to be broadcast, but as we cannot use a sink 
inside a Flink
+* iteration, so we represent it as a [[DataSet]] with a single element 
containing the matrix.
+*
+* The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
+* then sums all these computed matrices to get `X^T * X`.
+*/
+  private[recommendation] def computeXtX(x: DataSet[(Int, 
Array[Array[Double]])], factors: Int):
+  DataSet[Array[Double]] = {
+val triangleSize = factors * (factors - 1) / 2 + factors
+
+type MtxBlock = (Int, Array[Array[Double]])
+// construct XtX for all blocks
+val xtx = x
+  .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
+var xtxForBlock: Array[Double] = null
+
+override def mapPartition(blocks: Iterable[(Int, 
Array[Array[Double]])],
+  out: Collector[Array[Double]]): Unit = {
+
+  if (xtxForBlock == null) {
+// creating the matrix if not yet created
+xtxForBlock = Array.fill(triangleSize)(0.0)
+  } else {
+// erasing the matrix
+var i = 0
+while (i < xtxForBlock.length) {
--- End diff --

Any reason why `fill` is not/cannot be used here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87199446
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -273,6 +308,14 @@ object ALS {
 val defaultValue: Option[Int] = Some(10)
   }
 
+  case object ImplicitPrefs extends Parameter[Boolean] {
--- End diff --

Can't find a way to comment on line 264/299 but we should take the 
opportunity to set the default number of factors to a more reasonable 50, and 
add to the docstring and documentation the recommendation:

> we recommend working with the highest number of factors feasible within 
computational limitations.

Which comes straight from the iALS paper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87195483
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -156,6 +171,26 @@ class ALS extends Predictor[ALS] {
 this
   }
 
+  /** Sets the input observations to be implicit, thus using the iALS 
algorithm for learning.
--- End diff --

The docstring is not worded correctly, as the passed argument could be true 
or false.

Should be prefixed with something like "When set to true, we assume 
implicit observations..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87201508
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -535,8 +581,17 @@ object ALS {
 itemOut: DataSet[(Int, OutBlockInformation)],
 userIn: DataSet[(Int, InBlockInformation)],
 factors: Int,
-lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]):
+lambda: Double, blockIDPartitioner: FlinkPartitioner[Int],
+implicitPrefs: Boolean,
+alpha: Double):
   DataSet[(Int, Array[Array[Double]])] = {
+// retrieve broadcast XtX matrix in implicit case
+val XtXtoBroadcast = if (implicitPrefs) {
--- End diff --

I'm a bit confused with the notation here, is this matrix the `YtY ` matrix 
from the paper? If yes, I would recommend sticking to the notation of the paper 
to avoid confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-09-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r81159482
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest._
+
+import scala.language.postfixOps
+import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.CommonTestUtils
+
+class ImplicitALSTest
+  extends FlatSpec
+with Matchers
+with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
+"for implicit feedback datasets."
+
+  it should "properly compute Y^T * Y, and factorize matrix" in {
--- End diff --

AFAIK in the rest of the FlinkML tests we just use `val env = 
ExecutionEnvironment.getExecutionEnvironment`. I don't know if that policy has 
now changed, maybe @tillrohrmann can clarify.

For now I would say to just split the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-09-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r81159171
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -581,6 +637,16 @@ object ALS {
 val userXy = new ArrayBuffer[Array[Double]]()
 val numRatings = new ArrayBuffer[Int]()
 
+var precomputedXtX: Array[Double] = null
+
+override def open(config: Configuration): Unit = {
+  // retrieve broadcasted precomputed XtX if using implicit 
feedback
+  if (implicitPrefs) {
+precomputedXtX = 
getRuntimeContext.getBroadcastVariable[Array[Double]]("XtX")
+  .iterator().next()
+  }
+}
+
 override def coGroup(left: lang.Iterable[(Int, Int, 
Array[Array[Double]])],
--- End diff --

We can ping @tillrohrmann here, as the original author maybe he has some 
input.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-09-29 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r81115301
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest._
+
+import scala.language.postfixOps
+import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.CommonTestUtils
+
+class ImplicitALSTest
+  extends FlatSpec
+with Matchers
+with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
+"for implicit feedback datasets."
+
+  it should "properly compute Y^T * Y, and factorize matrix" in {
+import ExampleMatrix._
+
+val rand = scala.util.Random
+val numBlocks = 3
+// randomly split matrix to blocks
+val blocksY = Y
+  // add a random block id to every row
+  .map { row =>
+(rand.nextInt(numBlocks), row)
+  }
+  // get the block via grouping
+  .groupBy(_._1).values
+  // add a block id (-1) to each block
+  .map(b => (-1, b.map(_._2)))
+  .toSeq
+
+// use Flink to compute YtY
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val distribBlocksY = env.fromCollection(blocksY)
+
+val YtY = ALS
+  .computeXtX(distribBlocksY, factors)
+  .collect().head
+
+// check YtY size
+YtY.length should be (factors * (factors - 1) / 2 + factors)
+
+// check result is as expected
+expectedUpperTriangleYtY
+  .zip(YtY)
+  .foreach { case (expected, result) =>
+result should be (expected +- 0.1)
+  }
+
+// temporary directory to avoid too few memory segments
+val tempDir = CommonTestUtils.getTempDir + "/"
+
+// factorize matrix with implicit ALS
+val als = ALS()
+  .setIterations(iterations)
+  .setLambda(lambda)
+  .setBlocks(blocks)
+  .setNumFactors(factors)
+  .setImplicit(true)
+  .setAlpha(alpha)
+  .setSeed(seed)
+  .setTemporaryPath(tempDir)
+
+val inputDS = env.fromCollection(implicitRatings)
+
+als.fit(inputDS)
+
+// check predictions on some user-item pairs
+val testData = env.fromCollection(expectedResult.map{
+  case (userID, itemID, rating) => (userID, itemID)
+})
+
+val predictions = als.predict(testData).collect()
+
+predictions.length should equal(expectedResult.length)
+
+val resultMap = expectedResult map {
+  case (uID, iID, value) => (uID, iID) -> value
+} toMap
+
+predictions foreach {
+  case (uID, iID, value) => {
+resultMap.isDefinedAt((uID, iID)) should be(true)
+
+value should be(resultMap((uID, iID)) +- 1e-5)
+  }
+}
+
+  }
+
+}
+
+object ExampleMatrix {
--- End diff --

Thanks. Moved the data to `Recommendation.scala`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-09-29 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r81115229
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest._
+
+import scala.language.postfixOps
+import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.CommonTestUtils
+
+class ImplicitALSTest
+  extends FlatSpec
+with Matchers
+with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
+"for implicit feedback datasets."
+
+  it should "properly compute Y^T * Y, and factorize matrix" in {
--- End diff --

Yes, you are right it should be split into two tests.
I kept it in one only because, if I remembered correctly, there was a rule 
not to initialize to `ExecutionEnvironment`s in one test (probably because of 
taking test performance into consideration). I did not see, however, how to 
initialize the environment once, as I would have had to override the `before` 
function implemented in `FlinkTestBase`.

What do you think would be the best solution? Overriding `before`, using 
two different `ExectionEnvironment`s, or keeping it this way (in one test)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-09-29 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r81114406
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -581,6 +637,16 @@ object ALS {
 val userXy = new ArrayBuffer[Array[Double]]()
 val numRatings = new ArrayBuffer[Int]()
 
+var precomputedXtX: Array[Double] = null
+
+override def open(config: Configuration): Unit = {
+  // retrieve broadcasted precomputed XtX if using implicit 
feedback
+  if (implicitPrefs) {
+precomputedXtX = 
getRuntimeContext.getBroadcastVariable[Array[Double]]("XtX")
+  .iterator().next()
+  }
+}
+
 override def coGroup(left: lang.Iterable[(Int, Int, 
Array[Array[Double]])],
--- End diff --

If I see it right, I did not change this line, it was in the original ALS 
implementation. However, I can't find any reason to use the Java `Iterable`.

There could be other minor things to refactor in the original ALS code, but 
I preferred to keep them as they were, not to break anything. Should I refactor 
some parts along the way when I extend an algorithm like this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-09-29 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r81112771
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -99,6 +114,26 @@ The alternating least squares implementation can be 
controlled by the following
 
   
   
+ImplicitPrefs
+
+  
+Implicit property of the observations, meaning that they do 
not represent an explicit
+preference of the user, just the implicit information how many 
times the user consumed the
+(Default value: false)
+  
+
+  
+  
+Alpha
+
+  
+Weight of the positive implicit observations. Should be 
non-negative.
+Only relevant when ImplicitPrefs is set to true.
+(Default value: 1)
--- End diff --

I guess it was left as default 1 for no reason, I don't have any motivation 
for it. I changed it to 40 according to the paper. Thanks for the suggestion!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-09-29 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r81112526
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -99,6 +114,26 @@ The alternating least squares implementation can be 
controlled by the following
 
   
   
+ImplicitPrefs
+
+  
+Implicit property of the observations, meaning that they do 
not represent an explicit
+preference of the user, just the implicit information how many 
times the user consumed the
--- End diff --

Thanks. Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-09-29 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r81112516
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -49,6 +49,21 @@ By applying this step alternately to the matrices $U$ 
and $V$, we can iterativel
 
 The matrix $R$ is given in its sparse representation as a tuple of $(i, j, 
r)$ where $i$ denotes the row index, $j$ the column index and $r$ is the matrix 
value at position $(i,j)$.
 
+An alternative model can be used for _implicit feedback_ datasets.
+These datasets only contain implicit feedback from the user
+in contrast to datasets with explicit feedback like movie ratings.
+For example users watch videos on a website and the website monitors which 
user
+viewed which video, so the users only provide their preference implicitly.
+In these cases the feedback should not be treated as a
+rating, but rather an evidence that the user prefers that item.
+Thus, for implicit feedback datasets there is a slightly different
+minimalization problem to solve (see [Hu et 
al.](http://dx.doi.org/10.1109/ICDM.2008.22) for details).
--- End diff --

Thanks. Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---