Github user BenFradet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8551#discussion_r46874064
  
    --- Diff: 
examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala ---
    @@ -192,115 +192,45 @@ object LDAExample {
           vocabSize: Int,
           stopwordFile: String): (RDD[(Long, Vector)], Array[String], Long) = {
     
    +    val sqlContext = SQLContext.getOrCreate(sc)
    +    import sqlContext.implicits._
    +
         // Get dataset of document texts
         // One document per line in each text file. If the input consists of 
many small files,
         // this can result in a large number of small partitions, which can 
degrade performance.
         // In this case, consider using coalesce() to create fewer, larger 
partitions.
    -    val textRDD: RDD[String] = sc.textFile(paths.mkString(","))
    -
    -    // Split text into words
    -    val tokenizer = new SimpleTokenizer(sc, stopwordFile)
    -    val tokenized: RDD[(Long, IndexedSeq[String])] = 
textRDD.zipWithIndex().map { case (text, id) =>
    -      id -> tokenizer.getWords(text)
    -    }
    -    tokenized.cache()
    -
    -    // Counts words: RDD[(word, wordCount)]
    -    val wordCounts: RDD[(String, Long)] = tokenized
    -      .flatMap { case (_, tokens) => tokens.map(_ -> 1L) }
    -      .reduceByKey(_ + _)
    -    wordCounts.cache()
    -    val fullVocabSize = wordCounts.count()
    -    // Select vocab
    -    //  (vocab: Map[word -> id], total tokens after selecting vocab)
    -    val (vocab: Map[String, Int], selectedTokenCount: Long) = {
    -      val tmpSortedWC: Array[(String, Long)] = if (vocabSize == -1 || 
fullVocabSize <= vocabSize) {
    -        // Use all terms
    -        wordCounts.collect().sortBy(-_._2)
    -      } else {
    -        // Sort terms to select vocab
    -        wordCounts.sortBy(_._2, ascending = false).take(vocabSize)
    -      }
    -      (tmpSortedWC.map(_._1).zipWithIndex.toMap, tmpSortedWC.map(_._2).sum)
    +    val df = sc.textFile(paths.mkString(",")).toDF("docs")
    +    val customizedStopWords: Array[String] = if (stopwordFile.isEmpty) {
    +      Array.empty[String]
    +    } else {
    +      val stopWordText = sc.textFile(stopwordFile).collect()
    +      stopWordText.flatMap(_.stripMargin.split("\\s+"))
         }
    -
    -    val documents = tokenized.map { case (id, tokens) =>
    -      // Filter tokens by vocabulary, and create word count vector 
representation of document.
    -      val wc = new mutable.HashMap[Int, Int]()
    -      tokens.foreach { term =>
    -        if (vocab.contains(term)) {
    -          val termIndex = vocab(term)
    -          wc(termIndex) = wc.getOrElse(termIndex, 0) + 1
    -        }
    -      }
    -      val indices = wc.keys.toArray.sorted
    -      val values = indices.map(i => wc(i).toDouble)
    -
    -      val sb = Vectors.sparse(vocab.size, indices, values)
    -      (id, sb)
    -    }
    -
    -    val vocabArray = new Array[String](vocab.size)
    -    vocab.foreach { case (term, i) => vocabArray(i) = term }
    -
    -    (documents, vocabArray, selectedTokenCount)
    +    val tokenizer = new RegexTokenizer()
    +      .setInputCol("docs")
    +      .setOutputCol("rawTokens")
    +    val stopWordsRemover = new StopWordsRemover()
    +      .setInputCol("rawTokens")
    +      .setOutputCol("tokens")
    +    stopWordsRemover.setStopWords(stopWordsRemover.getStopWords ++ 
customizedStopWords)
    +    val countVectorizer = new CountVectorizer()
    +      .setVocabSize(vocabSize)
    +      .setInputCol("tokens")
    +      .setOutputCol("vectors")
    +
    +    val pipeline = new Pipeline()
    +      .setStages(Array(tokenizer, stopWordsRemover, countVectorizer))
    +
    +    val model = pipeline.fit(df)
    +    val documents = model.transform(df)
    +      .select("vectors")
    +      .map { case Row(features: Vector) => features }
    +      .zipWithIndex()
    +      .map(_.swap)
    +
    +    (documents,
    +      model.stages(2).asInstanceOf[CountVectorizerModel].vocabulary,  // 
vocabulary
    +      documents.map(_._2.numActives).sum().toLong) // total token count
       }
     }
     
    -/**
    - * Simple Tokenizer.
    - *
    - * TODO: Formalize the interface, and make this a public class in 
mllib.feature
    - */
    -private class SimpleTokenizer(sc: SparkContext, stopwordFile: String) 
extends Serializable {
    -
    -  private val stopwords: Set[String] = if (stopwordFile.isEmpty) {
    -    Set.empty[String]
    -  } else {
    -    val stopwordText = sc.textFile(stopwordFile).collect()
    -    stopwordText.flatMap(_.stripMargin.split("\\s+")).toSet
    -  }
    -
    -  // Matches sequences of Unicode letters
    -  private val allWordRegex = "^(\\p{L}*)$".r
    -
    -  // Ignore words shorter than this length.
    -  private val minWordLength = 3
    -
    -  def getWords(text: String): IndexedSeq[String] = {
    -
    -    val words = new mutable.ArrayBuffer[String]()
    -
    -    // Use Java BreakIterator to tokenize text into words.
    -    val wb = BreakIterator.getWordInstance
    -    wb.setText(text)
    -
    -    // current,end index start,end of each word
    -    var current = wb.first()
    -    var end = wb.next()
    -    while (end != BreakIterator.DONE) {
    -      // Convert to lowercase
    -      val word: String = text.substring(current, end).toLowerCase
    -      // Remove short words and strings that aren't only letters
    -      word match {
    -        case allWordRegex(w) if w.length >= minWordLength && 
!stopwords.contains(w) =>
    -          words += w
    -        case _ =>
    -      }
    -
    -      current = end
    -      try {
    -        end = wb.next()
    -      } catch {
    -        case e: Exception =>
    -          // Ignore remaining text in line.
    -          // This is a known bug in BreakIterator (for some Java versions),
    -          // which fails when it sees certain characters.
    -          end = BreakIterator.DONE
    -      }
    -    }
    -    words
    -  }
    -
    -}
    -// scalastyle:on println
    --- End diff --
    
    Are you sure about removing the scalastyle line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to