Github user hhbyyh commented on the issue:
https://github.com/apache/spark/pull/15415
Hi @jkbradley After further performance comparison, I found using broadcast
would give much better performance for the transform.
I tested with some public data from http://fimi.ua.ac.be/data/.
For kosarak (.gz) data (300K records), the current transform would take
more than 3 hours for only 2 rules, while the broadcast version only cost 0.15
sec with 900 rules. ( I adjusted support and confidence)
```
val rules = associationRules.rdd.map(r =>
(r.getSeq[Int](0), r.getSeq[Int](1))
).collect()
val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
// For each rule, examine the input items and summarize the consequents
val predictUDF = udf((items: Seq[Int]) => brRules.value.flatMap( r =>
if (r._1.forall(items.contains(_))) r._2 else Seq.empty[Int]
).distinct)
dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
```
The test can be verified by the code:
https://gist.github.com/hhbyyh/06fcf3fdc8f6edda971847bcb5783d99
https://gist.github.com/hhbyyh/889b88ae2176d1263fdc9dd3e29d1c2d
Thinking again, the broadcast implementation may have much better
performance in any case. The major issue is how to support generic with the
UDF.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]