Github user feynmanliang commented on a diff in the pull request:
https://github.com/apache/spark/pull/7937#discussion_r36244340
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
---
@@ -139,200 +202,308 @@ class PrefixSpan private (
run(data.rdd.map(_.asScala.map(_.asScala.toArray).toArray))
}
+}
+
+@Experimental
+object PrefixSpan extends Logging {
+
/**
- * Find the complete set of sequential patterns in the input sequences.
This method utilizes
- * the internal representation of itemsets as Array[Int] where each
itemset is represented by
- * a contiguous sequence of non-negative integers and delimiters
represented by [[DELIMITER]].
- * @param data ordered sequences of itemsets. Items are represented by
non-negative integers.
- * Each itemset has one or more items and is delimited by
[[DELIMITER]].
- * @return a set of sequential pattern pairs,
- * the key of pair is pattern (a list of elements),
- * the value of pair is the pattern's count.
+ * Find the complete set of frequent sequential patterns in the input
sequences.
+ * @param data ordered sequences of itemsets. We represent a sequence
internally as Array[Int],
+ * where each itemset is represented by a contiguous
sequence of distinct and ordered
+ * positive integers. We use 0 as the delimiter at itemset
boundaries, including the
+ * first and the last position.
+ * @return an RDD of (frequent sequential pattern, count) pairs,
+ * @see [[Postfix]]
*/
- private[fpm] def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+ private[fpm] def genFreqPatterns(
+ data: RDD[Array[Int]],
+ minCount: Long,
+ maxPatternLength: Int,
+ maxLocalProjDBSize: Long): RDD[(Array[Int], Long)] = {
val sc = data.sparkContext
if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("Input data is not cached.")
}
- // Use List[Set[Item]] for internal computation
- val sequences = data.map { seq => splitSequence(seq.toList) }
-
- // Convert min support to a min number of transactions for this dataset
- val minCount = if (minSupport == 0) 0L else
math.ceil(sequences.count() * minSupport).toLong
-
- // (Frequent items -> number of occurrences, all items here satisfy
the `minSupport` threshold
- val freqItemCounts = sequences
- .flatMap(seq => seq.flatMap(nonemptySubsets(_)).distinct.map(item =>
(item, 1L)))
- .reduceByKey(_ + _)
- .filter { case (item, count) => (count >= minCount) }
- .collect()
- .toMap
-
- // Pairs of (length 1 prefix, suffix consisting of frequent items)
- val itemSuffixPairs = {
- val freqItemSets = freqItemCounts.keys.toSet
- val freqItems = freqItemSets.flatten
- sequences.flatMap { seq =>
- val filteredSeq = seq.map(item =>
freqItems.intersect(item)).filter(_.nonEmpty)
- freqItemSets.flatMap { item =>
- val candidateSuffix = LocalPrefixSpan.getSuffix(item,
filteredSeq)
- candidateSuffix match {
- case suffix if !suffix.isEmpty => Some((List(item), suffix))
- case _ => None
+ val postfixes = data.map(items => new Postfix(items))
+
+ // Local frequent patterns (prefixes) and their counts.
+ val localFreqPatterns = mutable.ArrayBuffer.empty[(Array[Int], Long)]
+ // Prefixes whose projected databases are small.
+ val smallPrefixes = mutable.Map.empty[Int, Prefix]
+ val emptyPrefix = Prefix.empty
+ // Prefixes whose projected databases are large.
+ var largePrefixes = mutable.Map(emptyPrefix.id -> emptyPrefix)
+ while (largePrefixes.nonEmpty) {
+ val numLocalFreqPatterns = localFreqPatterns.length
+ logInfo(s"number of local frequent patterns: $numLocalFreqPatterns")
+ if (localFreqPatterns.length > 1000000) {
+ logWarning(
+ s"""
+ | Collected $numLocalFreqPatterns local frequent patterns.
You may want to consider:
+ | 1. increase minSupport,
+ | 2. decrease maxPatternLength,
+ | 3. increase maxLocalProjDBSize.
+ """.stripMargin)
+ }
+ logInfo(s"number of small prefixes: ${smallPrefixes.size}")
+ logInfo(s"number of large prefixes: ${largePrefixes.size}")
+ val largePrefixArray = largePrefixes.values.toArray
+ val freqPrefixes = postfixes.flatMap { postfix =>
+ largePrefixArray.flatMap { prefix =>
+ postfix.project(prefix).genPrefixItems.map { case (item,
postfixSize) =>
+ ((prefix.id, item), (1L, postfixSize))
+ }
+ }
+ }.reduceByKey { case ((c0, s0), (c1, s1)) =>
+ (c0 + c1, s0 + s1)
--- End diff --
Could `c0 + c1` possibly collide with an existing prefix id? May be better
to just call `Prefix.nextId` here
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]