[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r89503899 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { */ def evaluate[Testing, PredictionValue]( testing: DataSet[Testing], - evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit - evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) + evaluateParameters: ParameterMap = ParameterMap.Empty) + (implicit evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) : DataSet[(PredictionValue, PredictionValue)] = { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) } } +trait RankingPredictor[Self] extends Estimator[Self] with WithParameters { + that: Self => + + def predictRankings( +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = +rankingPredictOperation.predictRankings(this, k, users, predictParameters) + + def evaluateRankings( +testing: DataSet[(Int,Int,Double)], +evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = { +// todo: do not burn 100 topK into code +predictRankings(100, testing.map(_._1).distinct(), evaluateParameters) + } +} + +trait RankingPredictOperation[Instance] { + def predictRankings( +instance: Instance, +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty) + : DataSet[(Int, Int, Int)] +} + +/** + * Trait for providing auxiliary data for ranking evaluations. + * + * They are useful e.g. for excluding items found in the training [[DataSet]] + * from the recommended top K items. + */ +trait TrainingRatingsProvider { + + def getTrainingData: DataSet[(Int, Int, Double)] + + /** +* Retrieving the training items. +* Although this can be calculated from the training data, it requires a costly +* [[DataSet.distinct]] operation, while in matrix factor models the set items could be +* given more efficiently from the item factors. +*/ + def getTrainingItems: DataSet[Int] = { +getTrainingData.map(_._2).distinct() + } +} + +/** + * Ranking predictions for the most common case. + * If we can predict ratings, we can compute top K lists by sorting the predicted ratings. + */ +class RankingFromRatingPredictOperation[Instance <: TrainingRatingsProvider] +(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, Int, Double)]) + extends RankingPredictOperation[Instance] { + + private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], exclude: DataSet[(Int, Int)]) + : DataSet[(Int, Int)] = { +users.cross(items) --- End diff -- I completely agree. There are use-cases where we would not like to give rankings from all the items. E.g. when recommending TV programs, we would only like to recommend currently running TV programs, but train on all of them. We'll include an `item` DataSet parameter to ranking predictions. (Btw. I believe the "Flink-way" is to let the user configure as much as possible, but that's just my opinion :) ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r89489117 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { */ def evaluate[Testing, PredictionValue]( testing: DataSet[Testing], - evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit - evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) + evaluateParameters: ParameterMap = ParameterMap.Empty) + (implicit evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) : DataSet[(PredictionValue, PredictionValue)] = { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) } } +trait RankingPredictor[Self] extends Estimator[Self] with WithParameters { + that: Self => + + def predictRankings( +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = +rankingPredictOperation.predictRankings(this, k, users, predictParameters) + + def evaluateRankings( +testing: DataSet[(Int,Int,Double)], +evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = { +// todo: do not burn 100 topK into code +predictRankings(100, testing.map(_._1).distinct(), evaluateParameters) + } +} + +trait RankingPredictOperation[Instance] { + def predictRankings( +instance: Instance, +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty) + : DataSet[(Int, Int, Int)] +} + +/** + * Trait for providing auxiliary data for ranking evaluations. + * + * They are useful e.g. for excluding items found in the training [[DataSet]] + * from the recommended top K items. + */ +trait TrainingRatingsProvider { + + def getTrainingData: DataSet[(Int, Int, Double)] + + /** +* Retrieving the training items. +* Although this can be calculated from the training data, it requires a costly +* [[DataSet.distinct]] operation, while in matrix factor models the set items could be +* given more efficiently from the item factors. +*/ + def getTrainingItems: DataSet[Int] = { +getTrainingData.map(_._2).distinct() + } +} + +/** + * Ranking predictions for the most common case. + * If we can predict ratings, we can compute top K lists by sorting the predicted ratings. + */ +class RankingFromRatingPredictOperation[Instance <: TrainingRatingsProvider] +(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, Int, Double)]) + extends RankingPredictOperation[Instance] { + + private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], exclude: DataSet[(Int, Int)]) + : DataSet[(Int, Int)] = { +users.cross(items) --- End diff -- You're right. Although there's not much we can do generally to avoid this, we might be able to optimize for matrix factorization. This solution works for *every* predictor that predicts ratings, and we currently use it in ALS ([here](https://github.com/apache/flink/pull/2838/files/45c98a97ef82d1012062dbcf6ade85a8d566062d#diff-80639a21b8fd166b5f7df5280cd609a9R467)). With a matrix factorization model *specifically*, we can avoid materializing all user-item pairs as tuples, and compute the rankings more directly, and that might be more efficient. So we could use a more specific `RankingPredictor` implementation in `ALS`. But even in that case, we still need to go through all the items for a particular user to calculate the top k items for that user. Also this is only calculated with for the users we'd like to give rankings to. E.g. in a testing scenario, for the users in the test data which might be significantly less than the users in the training data. I suggest to keep this anyway as this is general. We might come up with a solution that's slightly efficient in most cases for MF models. Should put effort in working on it? What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r89292988 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { */ def evaluate[Testing, PredictionValue]( testing: DataSet[Testing], - evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit - evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) + evaluateParameters: ParameterMap = ParameterMap.Empty) + (implicit evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) : DataSet[(PredictionValue, PredictionValue)] = { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) } } +trait RankingPredictor[Self] extends Estimator[Self] with WithParameters { + that: Self => + + def predictRankings( +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = +rankingPredictOperation.predictRankings(this, k, users, predictParameters) + + def evaluateRankings( +testing: DataSet[(Int,Int,Double)], +evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = { +// todo: do not burn 100 topK into code +predictRankings(100, testing.map(_._1).distinct(), evaluateParameters) + } +} + +trait RankingPredictOperation[Instance] { + def predictRankings( +instance: Instance, +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty) + : DataSet[(Int, Int, Int)] +} + +/** + * Trait for providing auxiliary data for ranking evaluations. + * + * They are useful e.g. for excluding items found in the training [[DataSet]] + * from the recommended top K items. + */ +trait TrainingRatingsProvider { + + def getTrainingData: DataSet[(Int, Int, Double)] + + /** +* Retrieving the training items. +* Although this can be calculated from the training data, it requires a costly +* [[DataSet.distinct]] operation, while in matrix factor models the set items could be +* given more efficiently from the item factors. +*/ + def getTrainingItems: DataSet[Int] = { +getTrainingData.map(_._2).distinct() + } +} + +/** + * Ranking predictions for the most common case. + * If we can predict ratings, we can compute top K lists by sorting the predicted ratings. + */ +class RankingFromRatingPredictOperation[Instance <: TrainingRatingsProvider] +(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, Int, Double)]) + extends RankingPredictOperation[Instance] { + + private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], exclude: DataSet[(Int, Int)]) + : DataSet[(Int, Int)] = { +users.cross(items) --- End diff -- This could very well blow up, do we have any limits on the size of the users and items DataSets? If I understand correctly, users comes from the calling predictRankings function and contains userids (potentially all users) and items are all the items in the training set, which could be in the millions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r88868762 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -267,6 +401,21 @@ trait PredictOperation[Instance, Model, Testing, Prediction] extends Serializabl def predict(value: Testing, model: Model): Prediction } +/** + * Operation for preparing a testing [[DataSet]] for evaluation. + * + * The most commonly [[EvaluateDataSetOperation]] is used, but evaluation of + * ranking recommendations need input in a different form. + */ +trait PrepareOperation[Instance, Testing, Prepared] extends Serializable { --- End diff -- `PrepareOperation` is the common trait for `EvaluateDataSetOperation` and preparing ranking evaluation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r88868369 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala --- @@ -18,12 +18,37 @@ package org.apache.flink.ml.evaluation +import org.apache.flink.api.common.operators.Order import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.ml._ +import org.apache.flink.ml.pipeline._ +import org.apache.flink.ml.RichNumericDataSet +import org.apache.flink.util.Collector import scala.reflect.ClassTag +trait Score[ --- End diff -- `Score` is the common trait for `RankingScore` and `PairwiseScore`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
GitHub user gaborhermann opened a pull request: https://github.com/apache/flink/pull/2838 [FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & evaluation (WIP) Please note that this is a work-in-progress PR for discussing API design decisions. We propose here a class hierarchy for fitting the ranking evaluations into the proposed evaluation framework (see [PR](https://github.com/apache/flink/pull/1849)). The features are mostly working, but documentation is missing and minor refactoring is needed. The evaluations currently work with top 100 rankings (burnt-in), and we still need to fix that. We need feedback for two main solutions, so we can go on with the PR. Thanks for any comment! ### `RankingPredictor` We have managed to rework the evaluation framework proposed by @thvasilo, so that ranking predictions would fit in. Our approach is to use separate `RankingPredictor` and `Predictor` traits. One main problem however remains: there is no common superclass for `RankingPredictor` and `Predictor` so the pipelining mechanism might not work. A `Predictor` can only be at the and of the pipeline, so this should not really be a problem, but I do not know for sure. An alternative solution would be to have different objects `ALS` and `RankingALS` that give different predictions, but both extends only a `Predictor`. There could be implicit conversions between the two. I would prefer the current solution if it does not break the pipelining. @thvasilo what do you think about this? (This seems to be a problem similar to having a `predict_proba` function in scikit learn classification models, where the same model for the same input gives two different predictions: a `predict` for discrete predictions and `predict_proba` for giving a probability.) ### Generalizing `EvalutateDataSetOperation` On the other hand, we seem to have solved the scoring issue. The users can evaluate a recommendation algorithm such as ALS by using a score operating on rankings (e.g. nDCG), or a score operating on ratings (e.g. RMSE). They only need to modify the `Score` they use in their code, and nothing else. The main problem was that the evaluate method and `EvaluateDataSetOperation` were not general enough. They prepare the evaluation to `(trueValue, predictedValue)` pairs (i.e. a `DataSet[(PredictionType, PredictionType)]`), while ranking evaluations needed a more general input with the true ratings (`DataSet[(Int,Int,Double)]`) and the predicted rankings (`DataSet[(Int,Int,Int)]`). Instead of using `EvaluateDataSetOperation` we use a more general `PrepareOperation`. We rename the `Score` in the original evaluation framework to `PairwiseScore`. `RankingScore` and `PairwiseScore` has a common trait `Score`. This way the user can use both a `RankingScore` and a `PairwiseScore` for a certain model, and only need to alter the score used in the code. In case of pairwise scores (that only need true and predicted value pairs for evaluation) `EvaluateDataSetOperation` is used as a `PrepareOperation`. It prepares the evaluation by creating `(trueValue, predicitedValue)` pairs from the test dataset. Thus, the result of preparing and the input of `PairwiseScore`s will be `DataSet[(PredictionType,PredictionType)]`. In case of rankings the `PrepareOperation` passes the test dataset and creates the rankings. The result of preparing and the input of `RankingScore`s will be `(DataSet[Int,Int,Double], DataSet[Int,Int,Int])`. I believe this is a fairly acceptable solution that avoids breaking the API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborhermann/flink ranking-rec-eval Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2838.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2838 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---