Github user frreiss commented on a diff in the pull request:
https://github.com/apache/spark/pull/12574#discussion_r60775792
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
---
@@ -218,11 +292,135 @@ class ALSModel private[ml] (
predict(userFactors("features"),
itemFactors("features")).as($(predictionCol)))
}
+ /**
+ * Generate top `num` recommended items for each user (or recommended
users for each item) in the
+ * input [[DataFrame]].
+ * If `dstCol` exists in the input schema, then the result will contain
the recommendations and
+ * the actual "ground truth" ids (taken from those ids present in
`dstCol`). If `dstCol` doesn't
+ * exist, then the result will only contain recommendations.
+ * @param dataset [[DataFrame]] containing a column of user or item ids
for which to recommend.
+ * @param srcFactors user / item factors for which to generate top-k
recommendations.
+ * @param dstFactors candidate user / item factors from which to
generate top-k recommendations.
+ * @param srcCol name of column containing ids for which to recommend.
+ * @param dstCol name of column containing ids used to generate the
"ground truth" set (if the
+ * column exists).
+ * @param num how many recommended items (or users) to compute for each
user (or item)
+ * @return [[DataFrame]] containing recommendations (and "ground truth"
set if `dstCol` exists)
+ * for each id in `srcCol`.
+ */
+ private def recommendUsersOrItems(
+ dataset: Dataset[_],
+ srcFactors: DataFrame,
+ dstFactors: DataFrame,
+ srcCol: String,
+ dstCol: String,
+ num: Int): DataFrame = {
+
+ import dataset.sqlContext.implicits._
+ val schema = dataset.schema
+
+ val factors = dataset
+ .select(srcCol)
+ .distinct
+ .join(srcFactors, dataset(srcCol) === srcFactors("id"), "left")
+ .select(srcFactors("id"), srcFactors("features"))
+
+ val topKRaw = ALSModel.recommendForAll(rank, factors, dstFactors, num)
+ val topK = if ($(withScores)) {
+ // with scores, 'predictions' is an Array((id1, score1), (id2,
score2), ...)
+ topKRaw.toDF("id", "recommendations")
+ } else {
+ // without scores, 'predictions' is an Array(id1, id2, ...)
+ topKRaw
+ .map { case (id, predsWithScores) => (id,
predsWithScores.map(_._1)) }
+ .toDF("id", "recommendations")
+ }
+
+ val result = if (schema.fieldNames.contains(dstCol)) {
+ // if 'dstCol' exists, we group by that column to generate the
'ground truth' set of
+ // user (or item) ids.
+ // Note, this changes the structure of the input DataFrame,
returning a row of
+ // (id, predictions, actual) per unique id in 'srcCol', discarding
all other columns.
+ // In practice this is expected only during model selection with
+ // CrossValidator/TrainValidationSplit using RankingEvaluator.
+ val actual = dataset
+ .select(srcCol, dstCol)
+ .as[(Int, Int)]
+ .groupByKey(_._1)
+ .mapGroups { case (src, ids) => (src, ids.map(_._2).toArray) }
+ .toDF("id", "actual")
+
+ dataset
+ .select(srcCol)
+ .distinct
+ .join(topK, dataset(srcCol) === topK("id"))
+ .join(actual, dataset(srcCol) === actual("id"))
+ .select(
--- End diff --
These two joins and distinct should probably be a `cogroup.mapGroups`,
since you know that `id` is a key and `dataset` is unlikely to contain many
duplicates. As far as I can see, the current Catalyst rules will generate a
conservative plan with three stages for the expression `distinct.join.join`,
since there's no information available on key constraints.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]