HeartSaVioR commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437043628



##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       This looks to be fragile. Is there a reason we have to apply allow-list 
instead of dumping all of the entities?

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1264,6 +1335,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         endProcessing(rootPath)
     }
   }
+

Review comment:
       nit: remove

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       This requires change for every time when any new api class is 
introduced, which is easy to miss - that's why I said 'fragile'. If we have to 
have some kind of allow-list vs reject-list, I think reject-list is better, 
because storing something after replaying is an exceptional case.
   
   Actually I don't think they even need to be retained at all during 
transition - it's optional to migrate them to the level DB. All of them should 
be just some sort of cache. The status of KV store is stable after finished 
replaying, otherwise it would be problematic for in-progress application. Think 
about in-progress application - KV store becomes invalidated once the event log 
file is updated which means newly stored entries are lost. In practice it 
doesn't matter at all.

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.util.Utils
+
+/**
+ * A class used to keep track of in-memory store usage by the SHS.
+ */
+private class HistoryServerMemoryManager(
+    conf: SparkConf) extends Logging {
+
+  private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+    logInfo("Initialized memory manager: " +
+      s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+      s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  def lease(
+      appId: String,
+      attemptId: Option[String],
+      eventLogSize: Long,
+      isCompressed: Boolean): Unit = {
+    val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed)
+    if (memoryUsage + currentUsage.get > maxUsage) {
+      throw new RuntimeException("Not enough memory to create hybrid store " +
+        s"for app $appId / $attemptId.")
+    }
+    active.synchronized {
+      active(appId -> attemptId) = memoryUsage
+    }
+    currentUsage.addAndGet(memoryUsage)
+    logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
+      s"app $appId / $attemptId")
+  }
+
+  def release(appId: String, attemptId: Option[String]): Unit = {
+    val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }
+
+    memoryUsage match {
+      case Some(m) =>
+        currentUsage.addAndGet(-m)
+        logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
+          s"app $appId / $attemptId")

Review comment:
       This should be exhaustive, according to the build result. If we don't do 
anything for the None, please add `case None =>`

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       This requires change for every time when any new api class is 
introduced, which is easy to miss - that's why I said 'fragile'. If we have to 
have some kind of allow-list vs reject-list, I think reject-list is better, 
because storing something after replaying is really an exceptional case.
   
   Actually I don't think they even need to be retained at all during 
transition - it's optional to migrate them to the level DB. All of them should 
be just some sort of cache. The status of KV store is stable after finished 
replaying, otherwise it would be problematic for in-progress application. Think 
about in-progress application - KV store becomes invalidated once the event log 
file is updated which means newly stored entries are lost. In practice it 
doesn't matter at all.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       If possible and works properly (though I guess so), it would be ideal if 
we don't specify the case "CachedQuantile" but don't apply all write operations 
to Level DB during the transition. It would trigger one more 
calculation/storing of CachedQuantiles after the transition to Level DB, but 
IMHO it wouldn't matter much as we already saved lots of seconds on loading and 
the overhead should be relatively small enough.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       If possible and works properly (though I guess so), it would be ideal if 
we don't specify the case "CachedQuantile" but don't apply all write operations 
to Level DB during the transition. It would trigger one more 
calculation/storing of CachedQuantiles after the transition to Level DB, but 
IMHO it wouldn't matter much as we already saved lots of seconds on loading and 
the overhead should be relatively small enough.
   
   EDIT: sorry the idea was a bit sketched and I can imagine the possible issue 
during transition. I'm OK with specifying the case, but would still be ideal if 
we have better alternatives.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       If possible and works properly, it would be ideal if we don't specify 
the case "CachedQuantile" but don't apply all write operations to Level DB 
during the transition. It would trigger one more calculation/storing of 
CachedQuantiles after the transition to Level DB, but IMHO it wouldn't matter 
much as we already saved lots of seconds on loading and the overhead should be 
relatively small enough.
   
   EDIT: sorry the idea was a bit sketched and I can imagine the possible issue 
during transition. I'm OK with specifying the case, but would still be ideal if 
we have better alternatives.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.

Review comment:
       It may be worth to also describe the assumptions in the class doc that 
we don't expect write operations (except the case for caching) after calling 
switch to level DB.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+    getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+    getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+    getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+    val store = getStore()
+    store.write(value)
+
+    if (backgroundThread == null) {
+      // New classes won't be dumped once the background thread is started
+      klassMap.putIfAbsent(value.getClass(), true)
+    }
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+    getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+    getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+    getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join()
+      }
+      if (levelDB != null) {
+        levelDB.close()
+      }
+    } catch {
+      case ioe: IOException => throw ioe
+    } finally {
+      inMemoryStore.close()
+    }
+  }
+
+  override def removeAllByIndexValues[T](
+      klass: Class[T],
+      index: String,
+      indexValues: Collection[_]): Boolean = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+    this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+    backgroundThread = new Thread(() => {
+      var exception: Option[Exception] = None
+
+      try {
+        for (klass <- klassMap.keys().asScala) {
+          val it = inMemoryStore.view(klass).closeableIterator()
+          while (it.hasNext()) {
+            levelDB.write(it.next())
+          }
+        }
+      } catch {
+        case e: Exception =>
+          exception = Some(e)
+      }
+
+      exception match {
+        case Some(e) =>
+          listener.onSwitchToLevelDBFail(e)

Review comment:
       This line can be moved to the catch statement.

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.util.Utils
+
+/**
+ * A class used to keep track of in-memory store usage by the SHS.
+ */
+private class HistoryServerMemoryManager(
+    conf: SparkConf) extends Logging {
+
+  private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+    logInfo("Initialized memory manager: " +
+      s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+      s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  def lease(
+      appId: String,
+      attemptId: Option[String],
+      eventLogSize: Long,
+      isCompressed: Boolean): Unit = {
+    val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed)
+    if (memoryUsage + currentUsage.get > maxUsage) {
+      throw new RuntimeException("Not enough memory to create hybrid store " +
+        s"for app $appId / $attemptId.")
+    }
+    active.synchronized {
+      active(appId -> attemptId) = memoryUsage
+    }
+    currentUsage.addAndGet(memoryUsage)
+    logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
+      s"app $appId / $attemptId")
+  }
+
+  def release(appId: String, attemptId: Option[String]): Unit = {
+    val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }
+
+    memoryUsage match {
+      case Some(m) =>
+        currentUsage.addAndGet(-m)
+        logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
+          s"app $appId / $attemptId")
+      case None =>
+    }
+  }
+
+  def approximateMemoryUsage(eventLogSize: Long, isCompressed: Boolean): Long 
= {

Review comment:
       Can be `private`

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+    getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+    getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+    getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+    val store = getStore()
+    store.write(value)
+
+    if (backgroundThread == null) {
+      // New classes won't be dumped once the background thread is started
+      klassMap.putIfAbsent(value.getClass(), true)
+    }
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+    getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+    getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+    getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join()
+      }
+      if (levelDB != null) {

Review comment:
       Probably we may want to guarantee this to be executed once regardless of 
exception being thrown in join().

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+    getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+    getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+    getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+    val store = getStore()
+    store.write(value)
+
+    if (backgroundThread == null) {
+      // New classes won't be dumped once the background thread is started
+      klassMap.putIfAbsent(value.getClass(), true)
+    }
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+    getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+    getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+    getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join()
+      }
+      if (levelDB != null) {
+        levelDB.close()
+      }
+    } catch {
+      case ioe: IOException => throw ioe
+    } finally {
+      inMemoryStore.close()
+    }
+  }
+
+  override def removeAllByIndexValues[T](
+      klass: Class[T],
+      index: String,
+      indexValues: Collection[_]): Boolean = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+    this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+    backgroundThread = new Thread(() => {
+      var exception: Option[Exception] = None
+
+      try {
+        for (klass <- klassMap.keys().asScala) {
+          val it = inMemoryStore.view(klass).closeableIterator()
+          while (it.hasNext()) {
+            levelDB.write(it.next())
+          }
+        }
+      } catch {
+        case e: Exception =>
+          exception = Some(e)
+      }
+
+      exception match {

Review comment:
       We can just remove this and remove the needs of `exception` variable, by 
applying below comments.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+    getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+    getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+    getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+    val store = getStore()
+    store.write(value)
+
+    if (backgroundThread == null) {
+      // New classes won't be dumped once the background thread is started
+      klassMap.putIfAbsent(value.getClass(), true)
+    }
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+    getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+    getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+    getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join()
+      }
+      if (levelDB != null) {
+        levelDB.close()
+      }
+    } catch {
+      case ioe: IOException => throw ioe
+    } finally {
+      inMemoryStore.close()
+    }
+  }
+
+  override def removeAllByIndexValues[T](
+      klass: Class[T],
+      index: String,
+      indexValues: Collection[_]): Boolean = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+    this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+    backgroundThread = new Thread(() => {
+      var exception: Option[Exception] = None
+
+      try {
+        for (klass <- klassMap.keys().asScala) {
+          val it = inMemoryStore.view(klass).closeableIterator()
+          while (it.hasNext()) {
+            levelDB.write(it.next())
+          }
+        }
+      } catch {
+        case e: Exception =>
+          exception = Some(e)
+      }
+
+      exception match {
+        case Some(e) =>
+          listener.onSwitchToLevelDBFail(e)
+        case None =>
+          listener.onSwitchToLevelDBSuccess()

Review comment:
       And if we don't want to let background thread be crashed (because the 
exception may not be propagated anywhere) then we may need to guard `onXXX` and 
`inMemoryStore.close()` as well. If we just would like the background thread 
simply to be crashed, it'd be nice to clear out which behavior is expected on 
the rest of the threads (SHS behavior).

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+    getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+    getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+    getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+    val store = getStore()
+    store.write(value)
+
+    if (backgroundThread == null) {
+      // New classes won't be dumped once the background thread is started
+      klassMap.putIfAbsent(value.getClass(), true)
+    }
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +

Review comment:
       IMHO IllegalStateException seems to be a better fit, but we could also 
listen for other reviewers.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##########
@@ -195,4 +195,18 @@ private[spark] object History {
       .version("3.0.0")
       .booleanConf
       .createWithDefault(true)
+
+  val HYBRID_STORE_ENABLED = 
ConfigBuilder("spark.history.store.hybridStore.enabled")
+    .doc("Whether to use HybridStore as the store when parsing event logs. " +
+      "HybridStore will first write data to an in-memory store and having a 
background thread " +
+      "that dumps data to a disk store after the writing to in-memory store is 
completed. " +
+      "Use it with caution, as in-memory store requires higher memory usage.")
+    .version("3.1.0")
+    .booleanConf
+    .createWithDefault(true)
+
+  val MAX_IN_MEMORY_STORE_USAGE = 
ConfigBuilder("spark.history.store.hybridStore.maxMemoryUsage")
+    .version("3.1.0")

Review comment:
       Probably better to mention it co-uses the heap memory so they should 
increase the heap memory in memory option for SHS if they enable hybrid store.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+    getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+    getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+    getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+    val store = getStore()
+    store.write(value)
+
+    if (backgroundThread == null) {
+      // New classes won't be dumped once the background thread is started
+      klassMap.putIfAbsent(value.getClass(), true)
+    }
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+    getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+    getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+    getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join()
+      }
+      if (levelDB != null) {
+        levelDB.close()
+      }
+    } catch {
+      case ioe: IOException => throw ioe
+    } finally {
+      inMemoryStore.close()
+    }
+  }
+
+  override def removeAllByIndexValues[T](
+      klass: Class[T],
+      index: String,
+      indexValues: Collection[_]): Boolean = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+    this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+    backgroundThread = new Thread(() => {
+      var exception: Option[Exception] = None
+
+      try {
+        for (klass <- klassMap.keys().asScala) {
+          val it = inMemoryStore.view(klass).closeableIterator()
+          while (it.hasNext()) {
+            levelDB.write(it.next())
+          }
+        }
+      } catch {
+        case e: Exception =>
+          exception = Some(e)
+      }
+
+      exception match {
+        case Some(e) =>
+          listener.onSwitchToLevelDBFail(e)
+        case None =>
+          listener.onSwitchToLevelDBSuccess()

Review comment:
       This block can be just moved to the end of try statement.

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1197,6 +1213,78 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+      dm: HistoryServerDiskManager,
+      appId: String,
+      attempt: AttemptInfoWrapper,
+      metadata: AppStatusStoreMetadata): KVStore = {
+
+    var retried = false
+    var hybridStore: HybridStore = null
+    while (hybridStore == null) {
+      val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+        attempt.lastIndex)
+      val isCompressed = reader.compressionCodec.isDefined
+
+      // Throws an exception if the memory space is not enough
+      memoryManager.lease(appId, attempt.info.attemptId, reader.totalSize, 
isCompressed)
+
+      logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+      val lease = dm.lease(reader.totalSize, isCompressed)
+      val isLeaseRolledBack = new 
java.util.concurrent.atomic.AtomicBoolean(false)
+      var store: HybridStore = null
+      try {
+        store = new HybridStore()
+        val levelDB = KVUtils.open(lease.tmpPath, metadata)
+        store.setLevelDB(levelDB)
+        rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
+
+        // Start the background thread to dump data to levelDB when writing to
+        // InMemoryStore is completed.
+        store.switchToLevelDB(new HybridStore.SwitchToLevelDBListener {

Review comment:
       I guess this can be moved out of try statement - so that we can treat 
the error cases during replay and switch separately. Maybe this can also 
address your comment in below.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+    getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+    getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+    getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+    val store = getStore()
+    store.write(value)
+
+    if (backgroundThread == null) {
+      // New classes won't be dumped once the background thread is started
+      klassMap.putIfAbsent(value.getClass(), true)
+    }
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+    getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+    getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+    getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join()
+      }
+      if (levelDB != null) {
+        levelDB.close()
+      }
+    } catch {
+      case ioe: IOException => throw ioe
+    } finally {
+      inMemoryStore.close()
+    }
+  }
+
+  override def removeAllByIndexValues[T](
+      klass: Class[T],
+      index: String,
+      indexValues: Collection[_]): Boolean = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +

Review comment:
       ditto

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+    getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+    getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+    getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+    val store = getStore()
+    store.write(value)
+
+    if (backgroundThread == null) {
+      // New classes won't be dumped once the background thread is started
+      klassMap.putIfAbsent(value.getClass(), true)
+    }
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+    getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+    getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+    getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join()
+      }
+      if (levelDB != null) {
+        levelDB.close()
+      }
+    } catch {
+      case ioe: IOException => throw ioe
+    } finally {
+      inMemoryStore.close()
+    }
+  }
+
+  override def removeAllByIndexValues[T](
+      klass: Class[T],
+      index: String,
+      indexValues: Collection[_]): Boolean = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+    this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+    backgroundThread = new Thread(() => {
+      var exception: Option[Exception] = None
+
+      try {
+        for (klass <- klassMap.keys().asScala) {
+          val it = inMemoryStore.view(klass).closeableIterator()
+          while (it.hasNext()) {
+            levelDB.write(it.next())
+          }
+        }
+      } catch {
+        case e: Exception =>
+          exception = Some(e)
+      }
+
+      exception match {
+        case Some(e) =>
+          listener.onSwitchToLevelDBFail(e)
+        case None =>
+          listener.onSwitchToLevelDBSuccess()
+          shouldUseInMemoryStore.set(false)
+          inMemoryStore.close()
+      }
+    })
+    backgroundThread.setDaemon(true)
+    backgroundThread.setName("hybridstore-switch-to-leveldb")
+    backgroundThread.start()
+  }
+
+  /**
+   * This method return the store that we should use.
+   */
+  private def getStore(): KVStore = {
+    if (shouldUseInMemoryStore.get) {
+      inMemoryStore
+    } else {
+      levelDB
+    }
+  }
+

Review comment:
       nit: I guess we don't prefer leaving empty line just for margin.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+    getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+    getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+    getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+    val store = getStore()
+    store.write(value)
+
+    if (backgroundThread == null) {
+      // New classes won't be dumped once the background thread is started
+      klassMap.putIfAbsent(value.getClass(), true)
+    }
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+    getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+    getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+    getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join()
+      }
+      if (levelDB != null) {
+        levelDB.close()
+      }
+    } catch {
+      case ioe: IOException => throw ioe
+    } finally {
+      inMemoryStore.close()
+    }
+  }
+
+  override def removeAllByIndexValues[T](
+      klass: Class[T],
+      index: String,
+      indexValues: Collection[_]): Boolean = {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+        "the hybrid store begins switching to levelDB")
+    }
+
+    getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+    this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+    backgroundThread = new Thread(() => {
+      var exception: Option[Exception] = None
+
+      try {
+        for (klass <- klassMap.keys().asScala) {
+          val it = inMemoryStore.view(klass).closeableIterator()
+          while (it.hasNext()) {
+            levelDB.write(it.next())
+          }
+        }
+      } catch {
+        case e: Exception =>
+          exception = Some(e)
+      }
+
+      exception match {
+        case Some(e) =>
+          listener.onSwitchToLevelDBFail(e)
+        case None =>
+          listener.onSwitchToLevelDBSuccess()
+          shouldUseInMemoryStore.set(false)
+          inMemoryStore.close()
+      }
+    })
+    backgroundThread.setDaemon(true)
+    backgroundThread.setName("hybridstore-switch-to-leveldb")
+    backgroundThread.start()
+  }
+
+  /**
+   * This method return the store that we should use.
+   */
+  private def getStore(): KVStore = {
+    if (shouldUseInMemoryStore.get) {
+      inMemoryStore
+    } else {
+      levelDB
+    }
+  }
+
+}
+
+private[history] object HybridStore {
+
+  trait SwitchToLevelDBListener {
+
+    def onSwitchToLevelDBSuccess(): Unit
+
+    def onSwitchToLevelDBFail(e: Exception): Unit
+  }
+

Review comment:
       nit: probably same here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to