Dong Wang created SPARK-29817: --------------------------------- Summary: Missing persist on docs in mllib.clustering.LDAOptimizer.initialize Key: SPARK-29817 URL: https://issues.apache.org/jira/browse/SPARK-29817 Project: Spark Issue Type: Improvement Components: MLlib Affects Versions: 2.4.3 Reporter: Dong Wang
The rdd docs in mllib.clustering.LDAOptimizer is used in two actions: verticesTMP.reduceByKey, and docs.take(1). It should be persisted. {code:scala} override private[clustering] def initialize( docs: RDD[(Long, Vector)], lda: LDA): EMLDAOptimizer = { ... val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) => // Add edges for terms with non-zero counts. termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) => Edge(docID, term2index(term), cnt) } } // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). val docTermVertices: RDD[(VertexId, TopicCounts)] = { val verticesTMP: RDD[(VertexId, TopicCounts)] = edges.mapPartitionsWithIndex { case (partIndex, partEdges) => val random = new Random(partIndex + randomSeed) partEdges.flatMap { edge => val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0) val sum = gamma * edge.attr Seq((edge.srcId, sum), (edge.dstId, sum)) } } verticesTMP.reduceByKey(_ + _) // RDD dependency: verticesTMP - edges - docs. First use docs } // Partition such that edges are grouped by document this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k this.vocabSize = docs.take(1).head._2.size // Second use docs {code} This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org