Github user Krimit commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17673#discussion_r113605682
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
---
    @@ -194,6 +232,285 @@ final class Word2Vec @Since("1.4.0") (
     
       @Since("1.4.1")
       override def copy(extra: ParamMap): Word2Vec = defaultCopy(extra)
    +
    +  /**
    +   * Similar to InitUnigramTable in the original code. Instead of using an 
array of size 100 million
    +   * like the original, we size it to be 20 times the vocabulary size.
    +   * We sacrifice memory here, to get constant time lookups into this 
array when generating
    +   * negative samples.
    +   */
    +  private def generateUnigramTable(normalizedWeights: Array[Double], 
tableSize: Int): Array[Int] = {
    +    val table = Array.fill(tableSize)(0)
    +    var a = 0
    +    var i = 0
    +    while (a < table.length) {
    +      table.update(a, i)
    +      if (a.toFloat / table.length >= normalizedWeights(i)) {
    +        i = math.min(normalizedWeights.length - 1, i + 1)
    +      }
    +      a += 1
    +    }
    +    table
    +  }
    +
    +  private def generateVocab[S <: Iterable[String]](input: RDD[S]):
    +      (Int, Long, Map[String, Int], Array[Int]) = {
    +    val sc = input.context
    +
    +    val words = input.flatMap(x => x)
    +
    +    val vocab = words.map(w => (w, 1L))
    +      .reduceByKey(_ + _)
    +      .filter{case (w, c) => c >= $(minCount)}
    +      .collect()
    +      .sortWith{case ((w1, c1), (w2, c2)) => c1 > c2}
    +
    +    val totalWordCount = vocab.map(_._2).sum
    +
    +    val vocabMap = vocab.zipWithIndex.map{case ((w, c), i) =>
    +      w -> i
    +    }.toMap
    +
    +    // We create a cumulative distribution array, unlike the original 
implemention
    +    // and use binary search to get insertion points. This should 
replicate the same
    +    // behavior as the table in original implementation.
    +    val weights = vocab.map(x => scala.math.pow(x._2, power))
    +    val totalWeight = weights.sum
    +
    +    val normalizedCumWeights = weights.scanLeft(0.0)(_ + _).tail.map(x => 
(x / totalWeight))
    +
    +    val unigramTableSize =
    +      math.min(maxUnigramTableSize, unigramTableSizeFactor * 
normalizedCumWeights.length)
    +    val unigramTable = generateUnigramTable(normalizedCumWeights, 
unigramTableSize)
    +
    +    (vocabMap.size, totalWordCount, vocabMap, unigramTable)
    +  }
    +
    +  /**
    +   * This method implements Word2Vec Continuous Bag Of Words based 
implementation using
    +   * negative sampling optimization, using BLAS for vectorizing operations 
where applicable.
    +   * The algorithm is parallelized in the same way as the skip-gram based 
estimation.
    +   * @param input
    +   * @return
    +   */
    +  private def fitCBOW[S <: Iterable[String]](input: RDD[S]): 
feature.Word2VecModel = {
    +    val (vocabSize, totalWordCount, vocabMap, uniTable) = 
generateVocab(input)
    +    val negSamples = $(negativeSamples)
    +    assert(negSamples < vocabSize,
    +      s"Vocab size ($vocabSize) cannot be smaller than negative 
samples($negSamples)")
    +    val seed = $(this.seed)
    +    val initRandom = new XORShiftRandom(seed)
    +
    +    val vectorSize = $(this.vectorSize)
    +
    +    val syn0Global = Array.fill(vocabSize * 
vectorSize)(initRandom.nextFloat - 0.5f)
    +    val syn1Global = Array.fill(vocabSize * vectorSize)(0.0f)
    +
    +    val sc = input.context
    +
    +    val vocabMapbc = sc.broadcast(vocabMap)
    +    val unigramTablebc = sc.broadcast(uniTable)
    +
    +    val window = $(windowSize)
    +
    +    val digitSentences = input.flatMap{sentence =>
    +      val wordIndexes = sentence.flatMap(vocabMapbc.value.get)
    +      wordIndexes.grouped($(maxSentenceLength)).map(_.toArray)
    +    }.repartition($(numPartitions)).cache()
    +
    +    val learningRate = $(stepSize)
    +
    +    val wordsPerPartition = totalWordCount / $(numPartitions)
    +
    +    logInfo(s"VocabSize: ${vocabMap.size}, TotalWordCount: 
$totalWordCount")
    +
    +    for {iteration <- 1 to $(maxIter)} {
    +      logInfo(s"Starting iteration: $iteration")
    +      val iterationStartTime = System.nanoTime()
    +
    +      val syn0bc = sc.broadcast(syn0Global)
    +      val syn1bc = sc.broadcast(syn1Global)
    +
    +      val partialFits = digitSentences.mapPartitionsWithIndex{ case (i_, 
iter) =>
    +        logInfo(s"Iteration: $iteration, Partition: $i_")
    +        logInfo(s"Numerical lib class being used : 
${blas.getClass.getName}")
    +        val random = new XORShiftRandom(seed ^ ((i_ + 1) << 16) ^ 
((-iteration - 1) << 8))
    +        val contextWordPairs = iter.flatMap(generateContextWordPairs(_, 
window, random))
    +
    +        val groupedBatches = contextWordPairs.grouped(batchSize)
    +
    +        val negLabels = 1.0f +: Array.fill(negSamples)(0.0f)
    +        val syn0 = syn0bc.value
    +        val syn1 = syn1bc.value
    +        val unigramTable = unigramTablebc.value
    +
    +        // initialize intermediate arrays
    +        val contextVector = Array.fill(vectorSize)(0.0f)
    +        val l2Vectors = Array.fill(vectorSize * (negSamples + 1))(0.0f)
    +        val gb = Array.fill(negSamples + 1)(0.0f)
    +        val hiddenLayerUpdate = Array.fill(vectorSize * (negSamples + 
1))(0.0f)
    +        val neu1e = Array.fill(vectorSize)(0.0f)
    +        val wordIndices = Array.fill(negSamples + 1)(0)
    +
    +        val time = System.nanoTime
    +        var batchTime = System.nanoTime
    +        var idx = -1L
    +        for (batch <- groupedBatches) {
    +          idx = idx + 1
    +
    +          val wordRatio =
    +            idx.toFloat * batchSize /
    +            ($(maxIter) * (wordsPerPartition.toFloat + 1)) + ((iteration - 
1).toFloat / $(maxIter))
    +          val alpha = math.max(learningRate * 0.0001, learningRate * (1 - 
wordRatio)).toFloat
    +
    +          if(idx % 10 == 0 && idx > 0) {
    +            logInfo(s"Partition: $i_, wordRatio = $wordRatio, alpha = 
$alpha")
    +            val wordCount = batchSize * idx
    +            val timeTaken = (System.nanoTime - time) / 1e6
    +            val batchWordCount = 10 * batchSize
    +            val currentBatchTime = (System.nanoTime - batchTime) / 1e6
    +            batchTime = System.nanoTime
    +            logInfo(s"Partition: $i_, Batch time: $currentBatchTime ms, 
batch speed: " +
    +              s"${batchWordCount / currentBatchTime * 1000} words/s")
    +            logInfo(s"Partition: $i_, Cumulative time: $timeTaken ms, 
cumulative speed: " +
    +              s"${wordCount / timeTaken * 1000} words/s")
    +          }
    +
    +          val errors = for ((contextIds, word) <- batch) yield {
    +            // initialize vectors to 0
    +            initializeVector(contextVector)
    +            initializeVector(l2Vectors)
    +            initializeVector(gb)
    +            initializeVector(hiddenLayerUpdate)
    +            initializeVector(neu1e)
    +
    +            val scale = 1.0f / contextIds.length
    +
    +            // feed forward
    +            contextIds.foreach { c =>
    +              blas.saxpy(vectorSize, scale, syn0, c * vectorSize, 1, 
contextVector, 0, 1)
    +            }
    +
    +            generateNegativeSamples(random, word, unigramTable, 
negSamples, wordIndices)
    +
    +            wordIndices.view.zipWithIndex.foreach { case (wordId, i) =>
    +              blas.scopy(vectorSize, syn1, vectorSize * wordId, 1, 
l2Vectors, vectorSize * i, 1)
    +            }
    +
    +            val rows = negSamples + 1
    +            val cols = vectorSize
    +            blas
    +              .sgemv("T", cols, rows, 1.0f, l2Vectors, 0, cols, 
contextVector, 0, 1, 0.0f, gb, 0, 1)
    +
    +            (0 to gb.length-1).foreach {i =>
    --- End diff --
    
    a simple ``while`` loop would be faster


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to