Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4047#discussion_r23829150
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala 
---
    @@ -0,0 +1,473 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV, normalize, axpy => brzAxpy}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.graphx._
    +import org.apache.spark.graphx.impl.GraphImpl
    +import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * Latent Dirichlet Allocation (LDA), a topic model designed for text 
documents.
    + *
    + * Terminology:
    + *  - "word" = "term": an element of the vocabulary
    + *  - "token": instance of a term appearing in a document
    + *  - "topic": multinomial distribution over words representing some 
concept
    + *
    + * Currently, the underlying implementation uses Expectation-Maximization 
(EM), implemented
    + * according to the Asuncion et al. (2009) paper referenced below.
    + *
    + * References:
    + *  - Original LDA paper (journal version):
    + *    Blei, Ng, and Jordan.  "Latent Dirichlet Allocation."  JMLR, 2003.
    + *     - This class implements their "smoothed" LDA model.
    + *  - Paper which clearly explains several algorithms, including EM:
    + *    Asuncion, Welling, Smyth, and Teh.
    + *    "On Smoothing and Inference for Topic Models."  UAI, 2009.
    + *
    + * NOTE: This is currently marked DeveloperApi since it is under active 
development and may undergo
    + *       API changes.
    + */
    +@DeveloperApi
    +class LDA private (
    +    private var k: Int,
    +    private var maxIterations: Int,
    +    private var docConcentration: Double,
    +    private var topicConcentration: Double,
    +    private var seed: Long,
    +    private var checkpointDir: Option[String],
    +    private var checkpointInterval: Int) extends Logging {
    +
    +  import LDA._
    +
    +  def this() = this(k = 10, maxIterations = 20, docConcentration = -1, 
topicConcentration = -1,
    +    seed = Utils.random.nextLong(), checkpointDir = None, 
checkpointInterval = 10)
    +
    +  /**
    +   * Number of topics to infer.  I.e., the number of soft cluster centers.
    +   * (default = 10)
    +   */
    +  def getK: Int = k
    +
    +  def setK(k: Int): this.type = {
    +    require(k > 0, s"LDA k (number of clusters) must be > 0, but was set 
to $k")
    +    this.k = k
    +    this
    +  }
    +
    +  /**
    +   * Topic smoothing parameter (commonly named "alpha").
    +   *
    +   * This is the parameter to the Dirichlet prior placed on each 
document's distribution over topics
    +   * ("theta").  We use a symmetric Dirichlet prior.
    +   *
    +   * This value should be > 1.0, where larger values mean more smoothing 
(more regularization).
    +   * If set to -1, then docConcentration is set automatically.
    +   *  (default = -1 = automatic)
    +   *
    +   * Automatic setting of parameter:
    +   *  - For EM: default = (50 / k) + 1.
    +   *     - The 50/k is common in LDA libraries.
    +   *     - The +1 follows Asuncion et al. (2009), who recommend a +1 
adjustment for EM.
    +   *
    +   * Note: The restriction > 1.0 may be relaxed in the future (allowing 
sparse solutions),
    +   *       but values in (0,1) are not yet supported.
    +   */
    +  def getDocConcentration: Double = {
    +    if (this.docConcentration == -1) {
    +      (50.0 / k) + 1.0
    +    } else {
    +      this.docConcentration
    +    }
    +  }
    +
    +  def setDocConcentration(docConcentration: Double): this.type = {
    +    require(docConcentration > 1.0 || docConcentration == -1.0,
    +      s"LDA docConcentration must be > 1.0 (or -1 for auto), but was set 
to $docConcentration")
    +    this.docConcentration = docConcentration
    +    this
    +  }
    +
    +  /** Alias for [[getDocConcentration]] */
    +  def getAlpha: Double = getDocConcentration
    +
    +  /** Alias for [[setDocConcentration()]] */
    +  def setAlpha(alpha: Double): this.type = setDocConcentration(alpha)
    +
    +  /**
    +   * Term smoothing parameter (commonly named "beta" or "eta").
    +   *
    +   * This is the parameter to the Dirichlet prior placed on each topic's 
distribution over terms
    +   * (which are called "beta" in the original LDA paper by Blei et al., 
but are called "phi" in many
    +   *  later papers such as Asuncion et al., 2009).
    +   *
    +   * This value should be > 0.0.
    +   * If set to -1, then topicConcentration is set automatically.
    +   *  (default = -1 = automatic)
    +   *
    +   * Automatic setting of parameter:
    +   *  - For EM: default = 0.1 + 1.
    +   *     - The 0.1 gives a small amount of smoothing.
    +   *     - The +1 follows Asuncion et al. (2009), who recommend a +1 
adjustment for EM.
    +   *
    +   * Note: The restriction > 1.0 may be relaxed in the future (allowing 
sparse solutions),
    +   *       but values in (0,1) are not yet supported.
    +   */
    +  def getTopicConcentration: Double = {
    +    if (this.topicConcentration == -1) {
    +      1.1
    +    } else {
    +      this.topicConcentration
    +    }
    +  }
    +
    +  def setTopicConcentration(topicConcentration: Double): this.type = {
    +    require(topicConcentration > 1.0 || topicConcentration == -1.0,
    +      s"LDA topicConcentration must be > 1.0 (or -1 for auto), but was set 
to $topicConcentration")
    +    this.topicConcentration = topicConcentration
    +    this
    +  }
    +
    +  /** Alias for [[getTopicConcentration]] */
    +  def getBeta: Double = getTopicConcentration
    +
    +  /** Alias for [[setTopicConcentration()]] */
    +  def setBeta(beta: Double): this.type = setBeta(beta)
    +
    +  /**
    +   * Maximum number of iterations for learning.
    +   * (default = 20)
    +   */
    +  def getMaxIterations: Int = maxIterations
    +
    +  def setMaxIterations(maxIterations: Int): this.type = {
    +    this.maxIterations = maxIterations
    +    this
    +  }
    +
    +  /** Random seed */
    +  def getSeed: Long = seed
    +
    +  def setSeed(seed: Long): this.type = {
    +    this.seed = seed
    +    this
    +  }
    +
    +  /**
    +   * Directory for storing checkpoint files during learning.
    +   * This is not necessary, but checkpointing helps with recovery (when 
nodes fail).
    +   * It also helps with eliminating temporary shuffle files on disk, which 
can be important when
    +   * LDA is run for many iterations.
    +   */
    +  def getCheckpointDir: Option[String] = checkpointDir
    +
    +  def setCheckpointDir(checkpointDir: String): this.type = {
    +    this.checkpointDir = Some(checkpointDir)
    +    this
    +  }
    +
    +  def clearCheckpointDir(): this.type = {
    +    this.checkpointDir = None
    +    this
    +  }
    +
    +  /**
    +   * Period (in iterations) between checkpoints.
    +   * @see [[getCheckpointDir]]
    +   */
    +  def getCheckpointInterval: Int = checkpointInterval
    +
    +  def setCheckpointInterval(checkpointInterval: Int): this.type = {
    +    this.checkpointInterval = checkpointInterval
    +    this
    +  }
    +
    +  /**
    +   * Learn an LDA model using the given dataset.
    +   *
    +   * @param documents  RDD of documents, which are term (word) count 
vectors paired with IDs.
    +   *                   The term count vectors are "bags of words" with a 
fixed-size vocabulary
    +   *                   (where the vocabulary size is the length of the 
vector).
    +   *                   Document IDs must be unique and >= 0.
    +   * @return  Inferred LDA model
    +   */
    +  def run(documents: RDD[(Long, Vector)]): DistributedLDAModel = {
    +    val state = LDA.initialState(documents, k, getDocConcentration, 
getTopicConcentration, seed,
    +      checkpointDir, checkpointInterval)
    +    var iter = 0
    +    val iterationTimes = Array.fill[Double](maxIterations)(0)
    +    while (iter < maxIterations) {
    +      val start = System.nanoTime()
    +      state.next()
    +      val elapsedSeconds = (System.nanoTime() - start) / 1e9
    +      iterationTimes(iter) = elapsedSeconds
    +      iter += 1
    +    }
    +    state.graphCheckpointer.deleteAllCheckpoints()
    +    new DistributedLDAModel(state, iterationTimes)
    +  }
    +}
    +
    +
    +object LDA {
    --- End diff --
    
    `private[recommendation]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to