Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/4047#discussion_r23980335
--- Diff:
examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import scala.collection.mutable
+
+import java.text.BreakIterator
+
+import scopt.OptionParser
+
+import org.apache.log4j.{Level, Logger}
+
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.mllib.clustering.LDA
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * An example Latent Dirichlet Allocation (LDA) app. Run with
+ * {{{
+ * ./bin/run-example mllib.LDAExample [options] <input>
+ * }}}
+ * If you use it as a template to create your own app, please use
`spark-submit` to submit your app.
+ */
+object LDAExample {
+
+ private case class Params(
+ input: Seq[String] = Seq.empty,
+ k: Int = 20,
+ maxIterations: Int = 10,
+ docConcentration: Double = -1,
+ topicConcentration: Double = -1,
+ vocabSize: Int = 10000,
+ stopwordFile: String = "",
+ checkpointDir: Option[String] = None,
+ checkpointInterval: Int = 10) extends AbstractParams[Params]
+
+ def main(args: Array[String]) {
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("LDAExample") {
+ head("LDAExample: an example LDA app for plain text data.")
+ opt[Int]("k")
+ .text(s"number of topics. default: ${defaultParams.k}")
+ .action((x, c) => c.copy(k = x))
+ opt[Int]("maxIterations")
+ .text(s"number of iterations of learning. default:
${defaultParams.maxIterations}")
+ .action((x, c) => c.copy(maxIterations = x))
+ opt[Double]("docConcentration")
+ .text(s"amount of topic smoothing to use (> 1.0) (-1=auto)." +
+ s" default: ${defaultParams.docConcentration}")
+ .action((x, c) => c.copy(docConcentration = x))
+ opt[Double]("topicConcentration")
+ .text(s"amount of term (word) smoothing to use (> 1.0) (-1=auto)."
+
+ s" default: ${defaultParams.topicConcentration}")
+ .action((x, c) => c.copy(topicConcentration = x))
+ opt[Int]("vocabSize")
+ .text(s"number of distinct word types to use, chosen by frequency.
(-1=all)" +
+ s" default: ${defaultParams.vocabSize}")
+ .action((x, c) => c.copy(vocabSize = x))
+ opt[String]("stopwordFile")
+ .text(s"filepath for a list of stopwords. Note: This must fit on a
single machine." +
+ s" default: ${defaultParams.stopwordFile}")
+ .action((x, c) => c.copy(stopwordFile = x))
+ opt[String]("checkpointDir")
+ .text(s"Directory for checkpointing intermediate results." +
+ s" Checkpointing helps with recovery and eliminates temporary
shuffle files on disk." +
+ s" default: ${defaultParams.checkpointDir}")
+ .action((x, c) => c.copy(checkpointDir = Some(x)))
+ opt[Int]("checkpointInterval")
+ .text(s"Iterations between each checkpoint. Only used if
checkpointDir is set." +
+ s" default: ${defaultParams.checkpointInterval}")
+ .action((x, c) => c.copy(checkpointInterval = x))
+ arg[String]("<input>...")
+ .text("input paths (directories) to plain text corpora." +
+ " Each text file line should hold 1 document.")
+ .unbounded()
+ .required()
+ .action((x, c) => c.copy(input = c.input :+ x))
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ }.getOrElse {
+ parser.showUsageAsError
+ sys.exit(1)
+ }
+ }
+
+ private def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"LDAExample with $params")
+ val sc = new SparkContext(conf)
+
+ Logger.getRootLogger.setLevel(Level.WARN)
+
+ // Load documents, and prepare them for LDA.
+ val preprocessStart = System.nanoTime()
+ val (corpus, vocabArray, actualNumTokens) =
+ preprocess(sc, params.input, params.vocabSize, params.stopwordFile)
+ corpus.cache()
+ val actualCorpusSize = corpus.count()
+ val actualVocabSize = vocabArray.size
+ val preprocessElapsed = (System.nanoTime() - preprocessStart) / 1e9
+
+ println()
+ println(s"Corpus summary:")
+ println(s"\t Training set size: $actualCorpusSize documents")
+ println(s"\t Vocabulary size: $actualVocabSize terms")
+ println(s"\t Training set size: $actualNumTokens tokens")
+ println(s"\t Preprocessing time: $preprocessElapsed sec")
+ println()
+
+ // Run LDA.
+ val lda = new LDA()
+ lda.setK(params.k)
+ .setMaxIterations(params.maxIterations)
+ .setDocConcentration(params.docConcentration)
+ .setTopicConcentration(params.topicConcentration)
+ .setCheckpointInterval(params.checkpointInterval)
+ if (params.checkpointDir.nonEmpty) {
+ lda.setCheckpointDir(params.checkpointDir.get)
+ }
+ val startTime = System.nanoTime()
+ val ldaModel = lda.run(corpus)
+ val elapsed = (System.nanoTime() - startTime) / 1e9
+
+ println(s"Finished training LDA model. Summary:")
+ println(s"\t Training time: $elapsed sec")
+ val avgLogLikelihood = ldaModel.logLikelihood /
actualCorpusSize.toDouble
+ println(s"\t Training data average log likelihood: $avgLogLikelihood")
+ println()
+
+ // Print the topics, showing the top-weighted terms for each topic.
+ val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
+ val topics = topicIndices.map { case (terms, termWeights) =>
+ terms.zip(termWeights).map { case (term, weight) =>
(vocabArray(term.toInt), weight) }
+ }
+ println(s"${params.k} topics:")
+ topics.zipWithIndex.foreach { case (topic, i) =>
+ println(s"TOPIC $i")
+ topic.foreach { case (term, weight) =>
+ println(s"$term\t$weight")
+ }
+ println()
+ }
+
+ }
+
+ /**
+ * Load documents, tokenize them, create vocabulary, and prepare
documents as term count vectors.
+ * @return (corpus, vocabulary as array, total token count in corpus)
+ */
+ private def preprocess(
+ sc: SparkContext,
+ paths: Seq[String],
+ vocabSize: Int,
+ stopwordFile: String): (RDD[(Long, Vector)], Array[String], Long) = {
+
+ // Get dataset of document texts
+ // One document per line in each text file.
+ val files: Seq[RDD[String]] = for (p <- paths) yield {
+ sc.textFile(p)
+ }
+ val textRDD: RDD[String] = files.reduce(_ ++ _) // combine results
from multiple paths
+
+ // Split text into words
+ val tokenizer = new SimpleTokenizer(sc, stopwordFile)
+ val tokenized: RDD[(Long, IndexedSeq[String])] =
textRDD.zipWithIndex().map { case (text, id) =>
+ id -> tokenizer.getWords(text)
+ }
+ tokenized.cache()
+
+ // Counts words: RDD[(word, wordCount)]
+ val wordCounts: RDD[(String, Long)] = tokenized
+ .flatMap { case (_, tokens) => tokens.map(_ -> 1L) }
+ .reduceByKey(_ + _)
+ // Sort words, and select vocab
+ val sortedWC = wordCounts.sortBy(_._2, ascending = false)
+ val selectedWC: Array[(String, Long)] = if (vocabSize == -1) {
+ sortedWC.collect()
--- End diff --
minor: count first then decide whether to sort, which is more expensive. If
you do want to have the indices ordered by word frequency, note that
`collect()` doesn't guarantee the ordering of results.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]