Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11263#discussion_r54164432
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala ---
    @@ -88,6 +110,204 @@ class AssociationRules private[fpm] (
         val tag = fakeClassTag[Item]
         run(freqItemsets.rdd)(tag)
       }
    +
    +  /**
    +   * Computes the union seq.
    +   *
    +   * @param freqItemIndices Frequent Items with Integer Indices.
    +   * @param minConfidence   minConfidence.
    +   * @param maxConsequent   maxConsequent.
    +   * @return an ordered union Seq of s1 and s2.
    +   *
    +   */
    +  @Since("2.0.0")
    +  private def genRules(freqItemIndices: RDD[(Seq[Int], Long)],
    +                       minConfidence: Double,
    +                       maxConsequent: Int
    +                      ): RDD[(Seq[Int], Seq[Int], Long, Long)] = {
    +
    +    val sc = freqItemIndices.sparkContext
    +
    +    val initCandidates = freqItemIndices.flatMap {
    +      case (indices, freq) =>
    +        indices.flatMap {
    +          index =>
    +            indices.partition(_ == index) match {
    +              case (consequent, antecendent) if antecendent.nonEmpty =>
    +                Some((antecendent, (consequent, freq)))
    +              case _ => None
    +            }
    +        }
    +    }
    +
    +    val initRules = sc.emptyRDD[(Seq[Int], Seq[Int], Long, Long)]
    +
    +    @tailrec
    +    def loop(candidates: RDD[(Seq[Int], (Seq[Int], Long))],
    +             lenConsequent: Int,
    +             rules: RDD[(Seq[Int], Seq[Int], Long, Long)]
    +            ): RDD[(Seq[Int], Seq[Int], Long, Long)] = {
    +
    +      val numCandidates = candidates.count()
    +
    +      log.info(s"Candidates for ${lenConsequent}-consequent rules : 
${numCandidates}")
    +
    +      if (numCandidates == 0 || lenConsequent > maxConsequent) {
    +        rules
    +      } else {
    +        val newRules = candidates.join(freqItemIndices).filter{
    +          case (antecendent, ((consequent, freqUnion), freqAntecedent)) =>
    +            freqUnion >= minConfidence * freqAntecedent
    +        }.map {
    +          case (antecendent, ((consequent, freqUnion), freqAntecedent)) =>
    +            (antecendent, consequent, freqUnion, freqAntecedent)
    +        }
    +
    +        val numNewRules = newRules.count()
    +        log.info(s"Generated ${lenConsequent}-consequent rules : 
${numNewRules}")
    +
    +        if (numNewRules == 0) {
    +          rules
    +        } else if (lenConsequent == maxConsequent) {
    +          sc.union(rules, newRules)
    +        } else {
    +          val numPairs = (lenConsequent + 1) * lenConsequent / 2
    +          val newCandidates = newRules.filter{
    +            // rules whose antecendent's length equals to 1 can not be 
used to generate new rules
    +            case (antecendent, consequent, freqUnion, freqAntecedent) =>
    +              antecendent.size > 1
    +          }.map {
    +            case (antecendent, consequent, freqUnion, freqAntecedent) =>
    +              val union = seqAdd(antecendent, consequent)
    +              ((union, freqUnion), consequent)
    +          }.groupByKey().filter {
    +            // Rule pruning. For a (lenConsequent + 1)-consequent rules, 
at least
    +            // (lenConsequent + 1) lenConsequent-consequent rules are 
needed.
    +            case ((union, freqUnion), consequents) =>
    +              consequents.size > lenConsequent
    +          }.flatMap {
    +            case ((union, freqUnion), consequents) =>
    +              val array = consequents.toArray
    +              val newConsequentCount = collection.mutable.Map[Seq[Int], 
Int]()
    +              for (i <- 0 until array.length; j <- i + 1 until 
array.length) {
    +                val newConsequent = seqAdd(array(i), array(j))
    +                if (newConsequent.length == lenConsequent + 1) {
    +                  val cnt = newConsequentCount.getOrElse(newConsequent, 0)
    +                  newConsequentCount.update(newConsequent, cnt + 1)
    +                }
    +              }
    --- End diff --
    
    This paragraph is too long. We should break it down and give meaningful 
names to the intermediate result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to