Github user jkbradley commented on a diff in the pull request:
https://github.com/apache/spark/pull/4419#discussion_r29296402
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
new DistributedLDAModel(this, iterationTimes)
}
}
+
+
+/**
+ * :: Experimental ::
+ *
+ * An online optimizer for LDA. The Optimizer implements the Online LDA
algorithm, which
+ * processes a subset of the corpus by each call to next, and update the
term-topic
+ * distribution adaptively.
+ *
+ * References:
+ * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet
Allocation." NIPS, 2010.
+ */
+@Experimental
+class OnlineLDAOptimizer extends LDAOptimizer {
+
+ // LDA common parameters
+ private var k: Int = 0
+ private var D: Int = 0
+ private var vocabSize: Int = 0
+ private var alpha: Double = 0
+ private var eta: Double = 0
+ private var randomSeed: Long = 0
+
+ // Online LDA specific parameters
+ private var tau_0: Double = -1
+ private var kappa: Double = -1
+ private var batchSize: Int = -1
+
+ // internal data structure
+ private var docs: RDD[(Long, Vector)] = null
+ private var lambda: BDM[Double] = null
+ private var Elogbeta: BDM[Double]= null
+ private var expElogbeta: BDM[Double] = null
+
+ // count of invocation to next, used to help deciding the weight for
each iteration
+ private var iteration = 0
+
+ /**
+ * A (positive) learning parameter that downweights early iterations
+ */
+ def getTau_0: Double = {
+ if (this.tau_0 == -1) {
+ 1024
+ } else {
+ this.tau_0
+ }
+ }
+
+ /**
+ * A (positive) learning parameter that downweights early iterations
+ * Automatic setting of parameter:
+ * - default = 1024, which follows the recommendation from OnlineLDA
paper.
+ */
+ def setTau_0(tau_0: Double): this.type = {
+ require(tau_0 > 0 || tau_0 == -1.0, s"LDA tau_0 must be positive, but
was set to $tau_0")
+ this.tau_0 = tau_0
+ this
+ }
+
+ /**
+ * Learning rate: exponential decay rate
+ */
+ def getKappa: Double = {
+ if (this.kappa == -1) {
+ 0.5
+ } else {
+ this.kappa
+ }
+ }
+
+ /**
+ * Learning rate: exponential decay rate---should be between
+ * (0.5, 1.0] to guarantee asymptotic convergence.
+ * - default = 0.5, which follows the recommendation from OnlineLDA
paper.
+ */
+ def setKappa(kappa: Double): this.type = {
+ require(kappa >= 0 || kappa == -1.0,
+ s"Online LDA kappa must be nonnegative (or -1 for auto), but was set
to $kappa")
+ this.kappa = kappa
+ this
+ }
+
+ /**
+ * Mini-batch size, which controls how many documents are used in each
iteration
+ */
+ def getBatchSize: Int = {
+ if (this.batchSize == -1) {
+ D / 100
+ } else {
+ this.batchSize
+ }
+ }
+
+ /**
+ * Mini-batch size, which controls how many documents are used in each
iteration
+ * default = 1% from total documents.
+ */
+ def setBatchSize(batchSize: Int): this.type = {
+ this.batchSize = batchSize
+ this
+ }
+
+ private[clustering] override def initialize(docs: RDD[(Long, Vector)],
lda: LDA): LDAOptimizer = {
+
+ this.k = lda.getK
+ this.D = docs.count().toInt
+ this.vocabSize = docs.first()._2.size
+ this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else
lda.getDocConcentration
+ this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else
lda.getTopicConcentration
+ this.randomSeed = randomSeed
+
+ this.docs = docs
+
+ // Initialize the variational distribution q(beta|lambda)
+ this.lambda = getGammaMatrix(k, vocabSize)
+ this.Elogbeta = dirichlet_expectation(lambda)
+ this.expElogbeta = exp(Elogbeta)
+ this.iteration = 0
+ this
+ }
+
+ /**
+ * Submit a a subset (like 1%, decide by the batchSize) of the corpus to
the Online LDA model,
+ * and it will update the topic distribution adaptively for the terms
appearing in the subset.
+ *
+ * @return Inferred LDA model
+ */
+ private[clustering] override def next(): OnlineLDAOptimizer = {
+ iteration += 1
+ val batchSize = getBatchSize
+ val batch = docs.sample(true, batchSize.toDouble / D,
randomSeed).cache()
+ if(batch.isEmpty()) return this
+
+ val k = this.k
+ val vocabSize = this.vocabSize
+ val expElogbeta = this.expElogbeta
+ val alpha = this.alpha
+
+ val stats = batch.mapPartitions(docs =>{
+ val stat = BDM.zeros[Double](k, vocabSize)
+ docs.foreach(doc =>{
+ val termCounts = doc._2
+ val (ids, cts) = termCounts match {
+ case v: DenseVector => (((0 until v.size).toList), v.values)
+ case v: SparseVector => (v.indices.toList, v.values)
+ case v => throw new IllegalArgumentException("Do not support
vector type " + v.getClass)
+ }
+
+ // Initialize the variational distribution q(theta|gamma) for the
mini-batch
+ var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 *
K
+ var Elogthetad = digamma(gammad) - digamma(sum(gammad)) // 1 *
K
+ var expElogthetad = exp(Elogthetad) // 1 *
K
+ val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K *
ids
+
+ var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 *
ids
+ var meanchange = 1D
+ val ctsVector = new BDV[Double](cts).t // 1 *
ids
+
+ // Iterate between gamma and phi until convergence
+ while (meanchange > 1e-5) {
+ val lastgamma = gammad
+ // 1*K 1 * ids ids * k
+ gammad = (expElogthetad :* ((ctsVector / phinorm) *
(expElogbetad.t))) + alpha
+ Elogthetad = digamma(gammad) - digamma(sum(gammad))
+ expElogthetad = exp(Elogthetad)
+ phinorm = expElogthetad * expElogbetad + 1e-100
+ meanchange = sum(abs(gammad - lastgamma)) / k
+ }
+
+ val m1 = expElogthetad.t.toDenseMatrix.t
+ val m2 = (ctsVector / phinorm).t.toDenseMatrix
+ val outerResult = kron(m1, m2) // K * ids
+ for (i <- 0 until ids.size) {
+ stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
+ }
+ stat
+ })
+ Iterator(stat)
+ })
+
+ val batchResult = stats.reduce(_ += _)
+ update(batchResult, iteration, batchSize)
+ batch.unpersist()
+ this
+ }
+
+ private[clustering] override def getLDAModel(iterationTimes:
Array[Double]): LDAModel = {
+ new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
+ }
+
+ private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
+
+ val tau_0 = this.getTau_0
+ val kappa = this.getKappa
+
+ // weight of the mini-batch.
+ val weight = math.pow(tau_0 + iter, -kappa)
+
+ // This step finishes computing the sufficient statistics for the M
step
+ val stat = raw :* expElogbeta
+
+ // Update lambda based on documents.
+ lambda = lambda * (1 - weight) + (stat * D.toDouble /
batchSize.toDouble + eta) * weight
+ Elogbeta = dirichlet_expectation(lambda)
+ expElogbeta = exp(Elogbeta)
+ }
+
+ private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={
+ val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0)
+ val temp = gammaRandomGenerator.sample(row * col).toArray
+ (new BDM[Double](col, row, temp)).t
+ }
+
+ private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = {
--- End diff --
Please add a little doc, even though it's an internal method
Naming: please use camelCase instead of underscore
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]