Jeff Stein created SPARK-13048: ---------------------------------- Summary: EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel Key: SPARK-13048 URL: https://issues.apache.org/jira/browse/SPARK-13048 Project: Spark Issue Type: Bug Components: MLlib Affects Versions: 1.5.2 Environment: Standalone Spark cluster Reporter: Jeff Stein
In EMLDAOptimizer, all checkpoints are deleted before returning the DistributedLDAModel. The most recent checkpoint is still necessary for operations on the DistributedLDAModel under a couple scenarios: - The graph doesn't fit in memory on the worker nodes (e.g. very large data set) - Late worker failures that require reading the now-dependent checkpoint. I ran into this problem running a 10M record LDA model in a memory starved environment. The model persistently failed in either the {{collect at LDAModel.scala:528}} stage when converting to a LocalLDAModel or in the {{reduce at LDAModel.scala:563}} stage when calling "describeTopics". In both cases, a FileNotFoundException is thrown attempting to access a checkpoint file. I'm not sure what the correct fix is here; it might involve a class signature change. An alternative simple fix is to leave the last checkpoint around and expect the user to clean the checkpoint directory themselves. {noformat} java.io.FileNotFoundException: File does not exist: /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071 {noformat} Relevant code is included below. LDAOptimizer.scala: {noformat} override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") this.graphCheckpointer.deleteAllCheckpoints() // The constructor's default arguments assume gammaShape = 100 to ensure equivalence in // LDAModel.toLocal conversion new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize, Vectors.dense(Array.fill(this.k)(this.docConcentration)), this.topicConcentration, iterationTimes) } {noformat} PeriodicCheckpointer.scala {noformat} /** * Call this at the end to delete any remaining checkpoint files. */ def deleteAllCheckpoints(): Unit = { while (checkpointQueue.nonEmpty) { removeCheckpointFile() } } /** * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files. * This prints a warning but does not fail if the files cannot be removed. */ private def removeCheckpointFile(): Unit = { val old = checkpointQueue.dequeue() // Since the old checkpoint is not deleted by Spark, we manually delete it. val fs = FileSystem.get(sc.hadoopConfiguration) getCheckpointFiles(old).foreach { checkpointFile => try { fs.delete(new Path(checkpointFile), true) } catch { case e: Exception => logWarning("PeriodicCheckpointer could not remove old checkpoint file: " + checkpointFile) } } } {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org