Github user feynmanliang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7937#discussion_r36243489
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
    @@ -83,45 +82,109 @@ class PrefixSpan private (
       /**
        * Gets the maximal pattern length (i.e. the length of the longest 
sequential pattern to consider.
        */
    -  def getMaxPatternLength: Double = this.maxPatternLength
    +  def getMaxPatternLength: Double = maxPatternLength
     
       /**
        * Sets maximal pattern length (default: `10`).
        */
       def setMaxPatternLength(maxPatternLength: Int): this.type = {
         // TODO: support unbounded pattern length when maxPatternLength = 0
    -    require(maxPatternLength >= 1, "The maximum pattern length value must 
be greater than 0.")
    +    require(maxPatternLength >= 1,
    +      s"The maximum pattern length value must be greater than 0, but got 
$maxPatternLength.")
         this.maxPatternLength = maxPatternLength
         this
       }
     
       /**
    -   * Find the complete set of sequential patterns in the input sequences 
of itemsets.
    -   * @param data ordered sequences of itemsets.
    -   * @return a [[PrefixSpanModel]] that contains the frequent sequences
    +   * Gets the maximum number of items allowed in a projected database 
before local processing.
    +   */
    +  def getMaxLocalProjDBSize: Long = maxLocalProjDBSize
    +
    +  /**
    +   * Sets the maximum number of items allowed in a projected database 
before local processing
    +   * (default: `32000000L`).
    +   */
    +  def setMaxLocalProjDBSize(maxLocalProjDBSize: Long): this.type = {
    +    require(maxLocalProjDBSize >= 0L,
    +      s"The maximum local projected database size must be nonnegative, but 
got $maxLocalProjDBSize")
    +    this.maxLocalProjDBSize = maxLocalProjDBSize
    +    this
    +  }
    +
    +  /**
    +   * Finds the complete set of frequent sequential patterns in the input 
sequences of itemsets.
    +   * @param data sequences of itemsets.
    +   * @return a [[PrefixSpanModel]] that contains the frequent patterns
        */
       def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): 
PrefixSpanModel[Item] = {
    -    val itemToInt = data.aggregate(Set[Item]())(
    -      seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet },
    -      combOp = { _ ++ _ }
    -    ).zipWithIndex.toMap
    -    val intToItem = Map() ++ (itemToInt.map { case (k, v) => (v, k) })
    -
    -    val dataInternalRepr = data.map { seq =>
    -      seq.map(itemset => itemset.map(itemToInt)).reduce((a, b) => a ++ 
(DELIMITER +: b))
    +    if (data.getStorageLevel == StorageLevel.NONE) {
    +      logWarning("Input data is not cached.")
         }
    -    val results = run(dataInternalRepr)
     
    -    def toPublicRepr(pattern: Iterable[Int]): List[Array[Item]] = {
    -      pattern.span(_ != DELIMITER) match {
    -        case (x, xs) if xs.size > 1 => x.map(intToItem).toArray :: 
toPublicRepr(xs.tail)
    -        case (x, xs) => List(x.map(intToItem).toArray)
    +    val totalCount = data.count()
    +    logInfo(s"number of sequences: $totalCount")
    +    val minCount = math.ceil(minSupport * totalCount).toLong
    +    logInfo(s"minimum count for a frequent pattern: $minCount")
    +
    +    // Find frequent items.
    +    val freqItemAndCounts = data.flatMap { itemsets =>
    +        val uniqItems = mutable.Set.empty[Item]
    +        itemsets.foreach { _.foreach { item =>
    +          uniqItems += item
    +        }}
    +        uniqItems.toIterator.map((_, 1L))
    +      }.reduceByKey(_ + _)
    +      .filter { case (_, count) =>
    +        count >= minCount
    +      }.collect()
    +    val freqItems = freqItemAndCounts.sortBy(-_._2).map(_._1)
    +    logInfo(s"number of frequent items: ${freqItems.size}")
    +
    +    // Keep only frequent items from input sequences and convert them to 
internal storage.
    +    val itemToInt = freqItems.zipWithIndex.toMap
    +    val dataInternalRepr = data.map { itemsets =>
    +      val allItems = mutable.ArrayBuilder.make[Int]
    +      allItems += 0
    +      itemsets.foreach { itemsets =>
    +        val items = mutable.ArrayBuilder.make[Int]
    +        itemsets.foreach { item =>
    +          if (itemToInt.contains(item)) {
    +            items += itemToInt(item) + 1 // using 1-indexing in internal 
format
    +          }
    +        }
    +        val result = items.result()
    +        if (result.nonEmpty) {
    +          allItems ++= result.sorted
    +        }
    +        allItems += 0
    +      }
    +      allItems.result()
    --- End diff --
    
    Should filter `allItems.nonEmpty` (e.g. flatMap with Option) to remove 
itemsetSeqs with all non-frequent itemsets?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to