Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11263#discussion_r54164412
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala ---
    @@ -56,30 +59,49 @@ class AssociationRules private[fpm] (
       }
     
       /**
    +   * Sets the maximum size of consequents used by Apriori Algorithm 
(default: `1`).
    +   */
    +  @Since("1.5.0")
    +  def setMaxConsequent(maxConsequent: Int): this.type = {
    +    this.maxConsequent = maxConsequent
    +    this
    +  }
    +
    +  /**
        * Computes the association rules with confidence above 
[[minConfidence]].
    +   *
        * @param freqItemsets frequent itemset model obtained from [[FPGrowth]]
        * @return a [[Set[Rule[Item]]] containing the assocation rules.
        *
        */
       @Since("1.5.0")
       def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): 
RDD[Rule[Item]] = {
    -    // For candidate rule X => Y, generate (X, (Y, freq(X union Y)))
    -    val candidates = freqItemsets.flatMap { itemset =>
    -      val items = itemset.items
    -      items.flatMap { item =>
    -        items.partition(_ == item) match {
    -          case (consequent, antecedent) if !antecedent.isEmpty =>
    -            Some((antecedent.toSeq, (consequent.toSeq, itemset.freq)))
    -          case _ => None
    +
    +    val sc = freqItemsets.sparkContext
    +
    +    val freqItems = freqItemsets.filter(_.items.length == 
1).flatMap(_.items).collect()
    +
    +    val freqItemIndices = freqItemsets.mapPartitions {
    +      it =>
    +        val itemToRank = freqItems.zipWithIndex.toMap
    +        it.map {
    +          itemset =>
    +            val indices = 
itemset.items.flatMap(itemToRank.get).sorted.toSeq
    +            (indices, itemset.freq)
             }
    -      }
         }
     
    -    // Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, 
and filter by confidence
    -    candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq)))
    -      .map { case (antecendent, ((consequent, freqUnion), freqAntecedent)) 
=>
    -      new Rule(antecendent.toArray, consequent.toArray, freqUnion, 
freqAntecedent)
    -    }.filter(_.confidence >= minConfidence)
    +    val rules = genRules(freqItemIndices, minConfidence, maxConsequent)
    +
    +    rules.mapPartitions {
    --- End diff --
    
    this is the same as `rules.map { case ... =>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to