Github user jkbradley commented on a diff in the pull request:
https://github.com/apache/spark/pull/8551#discussion_r46874564
--- Diff:
examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala ---
@@ -192,115 +192,45 @@ object LDAExample {
vocabSize: Int,
stopwordFile: String): (RDD[(Long, Vector)], Array[String], Long) = {
+ val sqlContext = SQLContext.getOrCreate(sc)
+ import sqlContext.implicits._
+
// Get dataset of document texts
// One document per line in each text file. If the input consists of
many small files,
// this can result in a large number of small partitions, which can
degrade performance.
// In this case, consider using coalesce() to create fewer, larger
partitions.
- val textRDD: RDD[String] = sc.textFile(paths.mkString(","))
-
- // Split text into words
- val tokenizer = new SimpleTokenizer(sc, stopwordFile)
- val tokenized: RDD[(Long, IndexedSeq[String])] =
textRDD.zipWithIndex().map { case (text, id) =>
- id -> tokenizer.getWords(text)
- }
- tokenized.cache()
-
- // Counts words: RDD[(word, wordCount)]
- val wordCounts: RDD[(String, Long)] = tokenized
- .flatMap { case (_, tokens) => tokens.map(_ -> 1L) }
- .reduceByKey(_ + _)
- wordCounts.cache()
- val fullVocabSize = wordCounts.count()
- // Select vocab
- // (vocab: Map[word -> id], total tokens after selecting vocab)
- val (vocab: Map[String, Int], selectedTokenCount: Long) = {
- val tmpSortedWC: Array[(String, Long)] = if (vocabSize == -1 ||
fullVocabSize <= vocabSize) {
- // Use all terms
- wordCounts.collect().sortBy(-_._2)
- } else {
- // Sort terms to select vocab
- wordCounts.sortBy(_._2, ascending = false).take(vocabSize)
- }
- (tmpSortedWC.map(_._1).zipWithIndex.toMap, tmpSortedWC.map(_._2).sum)
+ val df = sc.textFile(paths.mkString(",")).toDF("docs")
+ val customizedStopWords: Array[String] = if (stopwordFile.isEmpty) {
+ Array.empty[String]
+ } else {
+ val stopWordText = sc.textFile(stopwordFile).collect()
+ stopWordText.flatMap(_.stripMargin.split("\\s+"))
}
-
- val documents = tokenized.map { case (id, tokens) =>
- // Filter tokens by vocabulary, and create word count vector
representation of document.
- val wc = new mutable.HashMap[Int, Int]()
- tokens.foreach { term =>
- if (vocab.contains(term)) {
- val termIndex = vocab(term)
- wc(termIndex) = wc.getOrElse(termIndex, 0) + 1
- }
- }
- val indices = wc.keys.toArray.sorted
- val values = indices.map(i => wc(i).toDouble)
-
- val sb = Vectors.sparse(vocab.size, indices, values)
- (id, sb)
- }
-
- val vocabArray = new Array[String](vocab.size)
- vocab.foreach { case (term, i) => vocabArray(i) = term }
-
- (documents, vocabArray, selectedTokenCount)
+ val tokenizer = new RegexTokenizer()
+ .setInputCol("docs")
+ .setOutputCol("rawTokens")
+ val stopWordsRemover = new StopWordsRemover()
+ .setInputCol("rawTokens")
+ .setOutputCol("tokens")
+ stopWordsRemover.setStopWords(stopWordsRemover.getStopWords ++
customizedStopWords)
+ val countVectorizer = new CountVectorizer()
+ .setVocabSize(vocabSize)
+ .setInputCol("tokens")
+ .setOutputCol("vectors")
--- End diff --
How about calling these "features" instead of "vectors"?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]