[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-24 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r89503899
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with 
WithParameters {
 */
   def evaluate[Testing, PredictionValue](
   testing: DataSet[Testing],
-  evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
-  evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
+  evaluateParameters: ParameterMap = ParameterMap.Empty)
+  (implicit evaluator: EvaluateDataSetOperation[Self, Testing, 
PredictionValue])
 : DataSet[(PredictionValue, PredictionValue)] = {
 FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
 evaluator.evaluateDataSet(this, evaluateParameters, testing)
   }
 }
 
+trait RankingPredictor[Self] extends Estimator[Self] with WithParameters {
+  that: Self =>
+
+  def predictRankings(
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] =
+rankingPredictOperation.predictRankings(this, k, users, 
predictParameters)
+
+  def evaluateRankings(
+testing: DataSet[(Int,Int,Double)],
+evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] = {
+// todo: do not burn 100 topK into code
+predictRankings(100, testing.map(_._1).distinct(), evaluateParameters)
+  }
+}
+
+trait RankingPredictOperation[Instance] {
+  def predictRankings(
+instance: Instance,
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)
+  : DataSet[(Int, Int, Int)]
+}
+
+/**
+  * Trait for providing auxiliary data for ranking evaluations.
+  *
+  * They are useful e.g. for excluding items found in the training 
[[DataSet]]
+  * from the recommended top K items.
+  */
+trait TrainingRatingsProvider {
+
+  def getTrainingData: DataSet[(Int, Int, Double)]
+
+  /**
+* Retrieving the training items.
+* Although this can be calculated from the training data, it requires 
a costly
+* [[DataSet.distinct]] operation, while in matrix factor models the 
set items could be
+* given more efficiently from the item factors.
+*/
+  def getTrainingItems: DataSet[Int] = {
+getTrainingData.map(_._2).distinct()
+  }
+}
+
+/**
+  * Ranking predictions for the most common case.
+  * If we can predict ratings, we can compute top K lists by sorting the 
predicted ratings.
+  */
+class RankingFromRatingPredictOperation[Instance <: 
TrainingRatingsProvider]
+(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, 
Int, Double)])
+  extends RankingPredictOperation[Instance] {
+
+  private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], 
exclude: DataSet[(Int, Int)])
+  : DataSet[(Int, Int)] = {
+users.cross(items)
--- End diff --

I completely agree. There are use-cases where we would not like to give 
rankings from all the items. E.g. when recommending TV programs, we would only 
like to recommend currently running TV programs, but train on all of them.

We'll include an `item` DataSet parameter to ranking predictions.

(Btw. I believe the "Flink-way" is to let the user configure as much as 
possible, but that's just my opinion :) )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-24 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r89489117
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with 
WithParameters {
 */
   def evaluate[Testing, PredictionValue](
   testing: DataSet[Testing],
-  evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
-  evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
+  evaluateParameters: ParameterMap = ParameterMap.Empty)
+  (implicit evaluator: EvaluateDataSetOperation[Self, Testing, 
PredictionValue])
 : DataSet[(PredictionValue, PredictionValue)] = {
 FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
 evaluator.evaluateDataSet(this, evaluateParameters, testing)
   }
 }
 
+trait RankingPredictor[Self] extends Estimator[Self] with WithParameters {
+  that: Self =>
+
+  def predictRankings(
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] =
+rankingPredictOperation.predictRankings(this, k, users, 
predictParameters)
+
+  def evaluateRankings(
+testing: DataSet[(Int,Int,Double)],
+evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] = {
+// todo: do not burn 100 topK into code
+predictRankings(100, testing.map(_._1).distinct(), evaluateParameters)
+  }
+}
+
+trait RankingPredictOperation[Instance] {
+  def predictRankings(
+instance: Instance,
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)
+  : DataSet[(Int, Int, Int)]
+}
+
+/**
+  * Trait for providing auxiliary data for ranking evaluations.
+  *
+  * They are useful e.g. for excluding items found in the training 
[[DataSet]]
+  * from the recommended top K items.
+  */
+trait TrainingRatingsProvider {
+
+  def getTrainingData: DataSet[(Int, Int, Double)]
+
+  /**
+* Retrieving the training items.
+* Although this can be calculated from the training data, it requires 
a costly
+* [[DataSet.distinct]] operation, while in matrix factor models the 
set items could be
+* given more efficiently from the item factors.
+*/
+  def getTrainingItems: DataSet[Int] = {
+getTrainingData.map(_._2).distinct()
+  }
+}
+
+/**
+  * Ranking predictions for the most common case.
+  * If we can predict ratings, we can compute top K lists by sorting the 
predicted ratings.
+  */
+class RankingFromRatingPredictOperation[Instance <: 
TrainingRatingsProvider]
+(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, 
Int, Double)])
+  extends RankingPredictOperation[Instance] {
+
+  private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], 
exclude: DataSet[(Int, Int)])
+  : DataSet[(Int, Int)] = {
+users.cross(items)
--- End diff --

You're right. Although there's not much we can do generally to avoid this, 
we might be able to optimize for matrix factorization. This solution works for 
*every* predictor that predicts ratings, and we currently use it in ALS 
([here](https://github.com/apache/flink/pull/2838/files/45c98a97ef82d1012062dbcf6ade85a8d566062d#diff-80639a21b8fd166b5f7df5280cd609a9R467)).
 With a matrix factorization model *specifically*, we can avoid materializing 
all user-item pairs as tuples, and compute the rankings more directly, and that 
might be more efficient. So we could use a more specific `RankingPredictor` 
implementation in `ALS`. But even in that case, we still need to go through all 
the items for a particular user to calculate the top k items for that user.

Also this is only calculated with for the users we'd like to give rankings 
to. E.g. in a testing scenario, for the users in the test data which might be 
significantly less than the users in the training data.

I suggest to keep this anyway as this is general. We might come up with a 
solution that's slightly efficient in most cases for MF models. Should put 
effort in working on it? What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-23 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r89292988
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with 
WithParameters {
 */
   def evaluate[Testing, PredictionValue](
   testing: DataSet[Testing],
-  evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
-  evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
+  evaluateParameters: ParameterMap = ParameterMap.Empty)
+  (implicit evaluator: EvaluateDataSetOperation[Self, Testing, 
PredictionValue])
 : DataSet[(PredictionValue, PredictionValue)] = {
 FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
 evaluator.evaluateDataSet(this, evaluateParameters, testing)
   }
 }
 
+trait RankingPredictor[Self] extends Estimator[Self] with WithParameters {
+  that: Self =>
+
+  def predictRankings(
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] =
+rankingPredictOperation.predictRankings(this, k, users, 
predictParameters)
+
+  def evaluateRankings(
+testing: DataSet[(Int,Int,Double)],
+evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] = {
+// todo: do not burn 100 topK into code
+predictRankings(100, testing.map(_._1).distinct(), evaluateParameters)
+  }
+}
+
+trait RankingPredictOperation[Instance] {
+  def predictRankings(
+instance: Instance,
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)
+  : DataSet[(Int, Int, Int)]
+}
+
+/**
+  * Trait for providing auxiliary data for ranking evaluations.
+  *
+  * They are useful e.g. for excluding items found in the training 
[[DataSet]]
+  * from the recommended top K items.
+  */
+trait TrainingRatingsProvider {
+
+  def getTrainingData: DataSet[(Int, Int, Double)]
+
+  /**
+* Retrieving the training items.
+* Although this can be calculated from the training data, it requires 
a costly
+* [[DataSet.distinct]] operation, while in matrix factor models the 
set items could be
+* given more efficiently from the item factors.
+*/
+  def getTrainingItems: DataSet[Int] = {
+getTrainingData.map(_._2).distinct()
+  }
+}
+
+/**
+  * Ranking predictions for the most common case.
+  * If we can predict ratings, we can compute top K lists by sorting the 
predicted ratings.
+  */
+class RankingFromRatingPredictOperation[Instance <: 
TrainingRatingsProvider]
+(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, 
Int, Double)])
+  extends RankingPredictOperation[Instance] {
+
+  private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], 
exclude: DataSet[(Int, Int)])
+  : DataSet[(Int, Int)] = {
+users.cross(items)
--- End diff --

This could very well blow up, do we have any limits on the size of the 
users  and items DataSets? 

If I understand correctly, users comes from the calling predictRankings 
function and contains userids (potentially all users) and items are all the 
items in the training set, which could be in the millions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-21 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r88868762
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -267,6 +401,21 @@ trait PredictOperation[Instance, Model, Testing, 
Prediction] extends Serializabl
   def predict(value: Testing, model: Model): Prediction
 }
 
+/**
+  * Operation for preparing a testing [[DataSet]] for evaluation.
+  *
+  * The most commonly [[EvaluateDataSetOperation]] is used, but evaluation 
of
+  * ranking recommendations need input in a different form.
+  */
+trait PrepareOperation[Instance, Testing, Prepared] extends Serializable {
--- End diff --

`PrepareOperation` is the common trait for `EvaluateDataSetOperation` and 
preparing ranking evaluation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-21 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r88868369
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala
 ---
@@ -18,12 +18,37 @@
 
 package org.apache.flink.ml.evaluation
 
+import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.ml._
+import org.apache.flink.ml.pipeline._
+import org.apache.flink.ml.RichNumericDataSet
+import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
 
+trait Score[
--- End diff --

`Score` is the common trait for `RankingScore` and `PairwiseScore`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-21 Thread gaborhermann
GitHub user gaborhermann opened a pull request:

https://github.com/apache/flink/pull/2838

[FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & evaluation (WIP)

Please note that this is a work-in-progress PR for discussing API design 
decisions. We propose here a class hierarchy for fitting the ranking 
evaluations into the proposed evaluation framework (see 
[PR](https://github.com/apache/flink/pull/1849)).
The features are mostly working, but documentation is missing and minor 
refactoring is needed. The evaluations currently work with top 100 rankings 
(burnt-in), and we still need to fix that. We need feedback for two main 
solutions, so we can go on with the PR. Thanks for any comment!

### `RankingPredictor`

We have managed to rework the evaluation framework proposed by @thvasilo, 
so that ranking predictions would fit in. Our approach is to use separate 
`RankingPredictor` and `Predictor` traits. One main problem however remains: 
there is no common superclass for `RankingPredictor` and `Predictor` so the 
pipelining mechanism might not work. A `Predictor` can only be at the and of 
the pipeline, so this should not really be a problem, but I do not know for 
sure. An alternative solution would be to have different objects `ALS` and 
`RankingALS` that give different predictions, but both extends only a 
`Predictor`. There could be implicit conversions between the two. I would 
prefer the current solution if it does not break the pipelining. @thvasilo what 
do you think about this?

(This seems to be a problem similar to having a `predict_proba` function in 
scikit learn classification models, where the same model for the same input 
gives two different predictions: a `predict` for discrete predictions and 
`predict_proba` for giving a probability.)

### Generalizing `EvalutateDataSetOperation`

On the other hand, we seem to have solved the scoring issue. The users can 
evaluate a recommendation algorithm such as ALS by using a score operating on 
rankings (e.g. nDCG), or a score operating on ratings (e.g. RMSE). They only 
need to modify the `Score` they use in their code, and nothing else.

The main problem was that the evaluate method and 
`EvaluateDataSetOperation` were not general enough. They prepare the evaluation 
to `(trueValue, predictedValue)` pairs (i.e. a `DataSet[(PredictionType, 
PredictionType)]`), while ranking evaluations needed a more general input with 
the true ratings (`DataSet[(Int,Int,Double)]`) and the predicted rankings 
(`DataSet[(Int,Int,Int)]`).

Instead of using `EvaluateDataSetOperation` we use a more general 
`PrepareOperation`. We rename the `Score` in the original evaluation framework 
to `PairwiseScore`. `RankingScore` and `PairwiseScore` has a common trait 
`Score`. This way the user can use both a `RankingScore` and a `PairwiseScore` 
for a certain model, and only need to alter the score used in the code.

In case of pairwise scores (that only need true and predicted value pairs 
for evaluation) `EvaluateDataSetOperation` is used as a `PrepareOperation`. It 
prepares the evaluation by creating `(trueValue, predicitedValue)` pairs from 
the test dataset. Thus, the result of preparing and the input of 
`PairwiseScore`s will be `DataSet[(PredictionType,PredictionType)]`. In case of 
rankings the `PrepareOperation` passes the test dataset and creates the 
rankings. The result of preparing and the input of `RankingScore`s will be 
`(DataSet[Int,Int,Double], DataSet[Int,Int,Int])`. I believe this is a fairly 
acceptable solution that avoids breaking the API.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborhermann/flink ranking-rec-eval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2838






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---