Jeff Stein created SPARK-13048:
----------------------------------

             Summary: EMLDAOptimizer deletes dependent checkpoint of 
DistributedLDAModel
                 Key: SPARK-13048
                 URL: https://issues.apache.org/jira/browse/SPARK-13048
             Project: Spark
          Issue Type: Bug
          Components: MLlib
    Affects Versions: 1.5.2
         Environment: Standalone Spark cluster
            Reporter: Jeff Stein


In EMLDAOptimizer, all checkpoints are deleted before returning the 
DistributedLDAModel.

The most recent checkpoint is still necessary for operations on the 
DistributedLDAModel under a couple scenarios:
- The graph doesn't fit in memory on the worker nodes (e.g. very large data set)
- Late worker failures that require reading the now-dependent checkpoint.

I ran into this problem running a 10M record LDA model in a memory starved 
environment. The model persistently failed in either the {{collect at 
LDAModel.scala:528}} stage when converting to a LocalLDAModel or in the 
{{reduce at LDAModel.scala:563}} stage when calling "describeTopics". In both 
cases, a FileNotFoundException is thrown attempting to access a checkpoint file.

I'm not sure what the correct fix is here; it might involve a class signature 
change. An alternative simple fix is to leave the last checkpoint around and 
expect the user to clean the checkpoint directory themselves.

{noformat}
java.io.FileNotFoundException: File does not exist: 
/hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
{noformat}

Relevant code is included below.

LDAOptimizer.scala:
{noformat}
  override private[clustering] def getLDAModel(iterationTimes: Array[Double]): 
LDAModel = {
    require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
    this.graphCheckpointer.deleteAllCheckpoints()
    // The constructor's default arguments assume gammaShape = 100 to ensure 
equivalence in
    // LDAModel.toLocal conversion
    new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, 
this.vocabSize,
      Vectors.dense(Array.fill(this.k)(this.docConcentration)), 
this.topicConcentration,
      iterationTimes)
  }
{noformat}

PeriodicCheckpointer.scala

{noformat}
  /**
   * Call this at the end to delete any remaining checkpoint files.
   */
  def deleteAllCheckpoints(): Unit = {
    while (checkpointQueue.nonEmpty) {
      removeCheckpointFile()
    }
  }

  /**
   * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
   * This prints a warning but does not fail if the files cannot be removed.
   */
  private def removeCheckpointFile(): Unit = {
    val old = checkpointQueue.dequeue()
    // Since the old checkpoint is not deleted by Spark, we manually delete it.
    val fs = FileSystem.get(sc.hadoopConfiguration)
    getCheckpointFiles(old).foreach { checkpointFile =>
      try {
        fs.delete(new Path(checkpointFile), true)
      } catch {
        case e: Exception =>
          logWarning("PeriodicCheckpointer could not remove old checkpoint 
file: " +
            checkpointFile)
      }
    }
  }
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to