redsanket commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r436145152



##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1197,6 +1213,71 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+      dm: HistoryServerDiskManager,
+      appId: String,
+      attempt: AttemptInfoWrapper,
+      metadata: AppStatusStoreMetadata): KVStore = {
+
+    val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+      attempt.lastIndex)
+    val isCompressed = reader.compressionCodec.isDefined
+
+    val memoryUsage = approximateMemoryUsage(reader.totalSize, isCompressed)
+    if (currentInMemoryStoreUsage.get + memoryUsage > maxInMemoryStoreUsage) {
+      throw new IllegalStateException("Not enough in-memory storage to create 
hybrid store.")
+    }
+    currentInMemoryStoreUsage.addAndGet(memoryUsage)
+    logInfo(s"Attempt creating hybrid store to parse $appId / 
${attempt.info.attemptId}. " +
+      s"Requested ${Utils.bytesToString(memoryUsage)} in-memory storage 
quota.")
+
+    logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+    val lease = dm.lease(reader.totalSize, isCompressed)
+    val isLeaseRolledBack = new 
java.util.concurrent.atomic.AtomicBoolean(false)
+    var store: HybridStore = null
+    try {
+      store = new HybridStore()
+      val levelDB = KVUtils.open(lease.tmpPath, metadata)
+      store.setLevelDB(levelDB)
+      store.setCachedQuantileKlass(classOf[CachedQuantile])
+      rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
+
+      // Start the background thread to dump data to levelDB when writing to
+      // InMemoryStore is completed.
+      store.switchingToLevelDB(new HybridStore.SwitchingToLevelDBListener {
+        override def onSwitchingToLevelDBSuccess: Unit = {
+          levelDB.close()
+          val newStorePath = lease.commit(appId, attempt.info.attemptId)
+          store.setLevelDB(KVUtils.open(newStorePath, metadata))
+          currentInMemoryStoreUsage.addAndGet(-memoryUsage)

Review comment:
       can we do decrementAndGet here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to