baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r420525489



##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     // At this point the disk data either does not exist or was deleted 
because it failed to
     // load, so the event log needs to be replayed.
 
+    // TODO: Maybe need to do other check to see if there's enough memory to
+    // use inMemoryStore.
+    if (hybridKVStoreEnabled) {
+      logInfo("Using HybridKVStore as KVStore")
+      var retried = false
+      var store: HybridKVStore = null
+      while(store == null) {
+        val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+          attempt.lastIndex)
+        val isCompressed = reader.compressionCodec.isDefined
+        logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+        val lease = dm.lease(reader.totalSize, isCompressed)
+        try {
+          val s = new HybridKVStore()
+          val levelDB = KVUtils.open(lease.tmpPath, metadata)
+          s.setLevelDB(levelDB)
+
+          s.startBackgroundThreadToWriteToDB(new 
HybridKVStore.SwitchingToLevelDBListener {
+            override def onSwitchingToLevelDBSuccess: Unit = {
+              levelDB.close()
+              val newStorePath = lease.commit(appId, attempt.info.attemptId)
+              s.setLevelDB(KVUtils.open(newStorePath, metadata))
+              logInfo(s"Completely switched to use leveldb for app" +
+              s" $appId / ${attempt.info.attemptId}")
+            }
+
+            override def onSwitchingToLevelDBFail(e: Exception): Unit = {
+              logWarning(s"Failed to switch to use LevelDb for app" +
+              s" $appId / ${attempt.info.attemptId}")
+              levelDB.close()
+              throw e
+            }
+          })
+
+          rebuildAppStore(s, reader, attempt.info.lastUpdated.getTime())
+          s.stopBackgroundThreadAndSwitchToLevelDB()
+          store = s
+        } catch {
+          case _: IOException if !retried =>
+            // compaction may touch the file(s) which app rebuild wants to read
+            // compaction wouldn't run in short interval, so try again...
+            logWarning(s"Exception occurred while rebuilding app $appId - 
trying again...")
+            lease.rollback()

Review comment:
       Hybrid kvstore is now cleaned up explicitly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to