Github user feynmanliang commented on a diff in the pull request:
https://github.com/apache/spark/pull/7937#discussion_r36243489
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
---
@@ -83,45 +82,109 @@ class PrefixSpan private (
/**
* Gets the maximal pattern length (i.e. the length of the longest
sequential pattern to consider.
*/
- def getMaxPatternLength: Double = this.maxPatternLength
+ def getMaxPatternLength: Double = maxPatternLength
/**
* Sets maximal pattern length (default: `10`).
*/
def setMaxPatternLength(maxPatternLength: Int): this.type = {
// TODO: support unbounded pattern length when maxPatternLength = 0
- require(maxPatternLength >= 1, "The maximum pattern length value must
be greater than 0.")
+ require(maxPatternLength >= 1,
+ s"The maximum pattern length value must be greater than 0, but got
$maxPatternLength.")
this.maxPatternLength = maxPatternLength
this
}
/**
- * Find the complete set of sequential patterns in the input sequences
of itemsets.
- * @param data ordered sequences of itemsets.
- * @return a [[PrefixSpanModel]] that contains the frequent sequences
+ * Gets the maximum number of items allowed in a projected database
before local processing.
+ */
+ def getMaxLocalProjDBSize: Long = maxLocalProjDBSize
+
+ /**
+ * Sets the maximum number of items allowed in a projected database
before local processing
+ * (default: `32000000L`).
+ */
+ def setMaxLocalProjDBSize(maxLocalProjDBSize: Long): this.type = {
+ require(maxLocalProjDBSize >= 0L,
+ s"The maximum local projected database size must be nonnegative, but
got $maxLocalProjDBSize")
+ this.maxLocalProjDBSize = maxLocalProjDBSize
+ this
+ }
+
+ /**
+ * Finds the complete set of frequent sequential patterns in the input
sequences of itemsets.
+ * @param data sequences of itemsets.
+ * @return a [[PrefixSpanModel]] that contains the frequent patterns
*/
def run[Item: ClassTag](data: RDD[Array[Array[Item]]]):
PrefixSpanModel[Item] = {
- val itemToInt = data.aggregate(Set[Item]())(
- seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet },
- combOp = { _ ++ _ }
- ).zipWithIndex.toMap
- val intToItem = Map() ++ (itemToInt.map { case (k, v) => (v, k) })
-
- val dataInternalRepr = data.map { seq =>
- seq.map(itemset => itemset.map(itemToInt)).reduce((a, b) => a ++
(DELIMITER +: b))
+ if (data.getStorageLevel == StorageLevel.NONE) {
+ logWarning("Input data is not cached.")
}
- val results = run(dataInternalRepr)
- def toPublicRepr(pattern: Iterable[Int]): List[Array[Item]] = {
- pattern.span(_ != DELIMITER) match {
- case (x, xs) if xs.size > 1 => x.map(intToItem).toArray ::
toPublicRepr(xs.tail)
- case (x, xs) => List(x.map(intToItem).toArray)
+ val totalCount = data.count()
+ logInfo(s"number of sequences: $totalCount")
+ val minCount = math.ceil(minSupport * totalCount).toLong
+ logInfo(s"minimum count for a frequent pattern: $minCount")
+
+ // Find frequent items.
+ val freqItemAndCounts = data.flatMap { itemsets =>
+ val uniqItems = mutable.Set.empty[Item]
+ itemsets.foreach { _.foreach { item =>
+ uniqItems += item
+ }}
+ uniqItems.toIterator.map((_, 1L))
+ }.reduceByKey(_ + _)
+ .filter { case (_, count) =>
+ count >= minCount
+ }.collect()
+ val freqItems = freqItemAndCounts.sortBy(-_._2).map(_._1)
+ logInfo(s"number of frequent items: ${freqItems.size}")
+
+ // Keep only frequent items from input sequences and convert them to
internal storage.
+ val itemToInt = freqItems.zipWithIndex.toMap
+ val dataInternalRepr = data.map { itemsets =>
+ val allItems = mutable.ArrayBuilder.make[Int]
+ allItems += 0
+ itemsets.foreach { itemsets =>
+ val items = mutable.ArrayBuilder.make[Int]
+ itemsets.foreach { item =>
+ if (itemToInt.contains(item)) {
+ items += itemToInt(item) + 1 // using 1-indexing in internal
format
+ }
+ }
+ val result = items.result()
+ if (result.nonEmpty) {
+ allItems ++= result.sorted
+ }
+ allItems += 0
+ }
+ allItems.result()
--- End diff --
Should filter `allItems.nonEmpty` (e.g. flatMap with Option) to remove
itemsetSeqs with all non-frequent itemsets?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]