[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-07-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r452534098



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * An implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the app store is restored. We don't expect write
+ * operations (except the case for caching) after calling switch to level DB.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // Flag to indicate whether this hybrid store is closed, use this flag
+  // to avoid starting background thread after the store is closed
+  private val closed = new AtomicBoolean(false)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+getStore().write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  closed.set(true)
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+} finally {
+  inMemoryStore.close()
+  if (levelDB != null) {
+levelDB.close()
+  }
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("removeAllByIndexValues() shouldn't be " 
+
+"called after the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(
+  listener: HybridStore.SwitchToLevelDBListener,
+  appId: String,
+  attemptId: Option[String]): Unit = {
+if 

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-07-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r452327580



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * An implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the app store is restored. We don't expect write
+ * operations (except the case for caching) after calling switch to level DB.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // Flag to indicate whether this hybrid store is closed, use this flag
+  // to avoid starting background thread after the store is closed
+  private val closed = new AtomicBoolean(false)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+getStore().write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  closed.set(true)
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+} finally {
+  inMemoryStore.close()
+  if (levelDB != null) {
+levelDB.close()
+  }
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("removeAllByIndexValues() shouldn't be " 
+
+"called after the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(
+  listener: HybridStore.SwitchToLevelDBListener,
+  appId: String,
+  attemptId: Option[String]): Unit = {
+if 

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-07-08 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r451892343



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * An implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the app store is restored. We don't expect write
+ * operations (except the case for caching) after calling switch to level DB.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // Flag to indicate whether this hybrid store is closed, use this flag
+  // to avoid starting background thread after the store is closed
+  private val closed = new AtomicBoolean(false)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+getStore().write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  closed.set(true)
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+} finally {
+  inMemoryStore.close()
+  if (levelDB != null) {
+levelDB.close()
+  }
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("removeAllByIndexValues() shouldn't be " 
+
+"called after the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(
+  listener: HybridStore.SwitchToLevelDBListener,
+  appId: String,
+  attemptId: Option[String]): Unit = {
+if 

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-07-08 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r451892343



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * An implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the app store is restored. We don't expect write
+ * operations (except the case for caching) after calling switch to level DB.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // Flag to indicate whether this hybrid store is closed, use this flag
+  // to avoid starting background thread after the store is closed
+  private val closed = new AtomicBoolean(false)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+getStore().write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  closed.set(true)
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+} finally {
+  inMemoryStore.close()
+  if (levelDB != null) {
+levelDB.close()
+  }
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("removeAllByIndexValues() shouldn't be " 
+
+"called after the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(
+  listener: HybridStore.SwitchToLevelDBListener,
+  appId: String,
+  attemptId: Option[String]): Unit = {
+if 

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-07-08 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r451838037



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * An implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the app store is restored. We don't expect write
+ * operations (except the case for caching) after calling switch to level DB.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // Flag to indicate whether this hybrid store is closed, use this flag
+  // to avoid starting background thread after the store is closed
+  private val closed = new AtomicBoolean(false)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+getStore().write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  closed.set(true)
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+} finally {
+  inMemoryStore.close()
+  if (levelDB != null) {
+levelDB.close()
+  }
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("removeAllByIndexValues() shouldn't be " 
+
+"called after the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(
+  listener: HybridStore.SwitchToLevelDBListener,
+  appId: String,
+  attemptId: Option[String]): Unit = {
+if 

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-07-08 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r451771981



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * An implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the app store is restored. We don't expect write
+ * operations (except the case for caching) after calling switch to level DB.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // Flag to indicate whether this hybrid store is closed, use this flag
+  // to avoid starting background thread after the store is closed
+  private val closed = new AtomicBoolean(false)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+getStore().write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  closed.set(true)
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+} finally {
+  inMemoryStore.close()
+  if (levelDB != null) {
+levelDB.close()
+  }
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("removeAllByIndexValues() shouldn't be " 
+
+"called after the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(
+  listener: HybridStore.SwitchToLevelDBListener,
+  appId: String,
+  attemptId: Option[String]): Unit = {
+if 

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-07-02 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r449140405



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.util.Utils
+
+/**
+ * A class used to keep track of in-memory store usage by the SHS.
+ */
+private class HistoryServerMemoryManager(
+conf: SparkConf) extends Logging {
+
+  private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+logInfo("Initialized memory manager: " +
+  s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+  s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  def lease(
+  appId: String,
+  attemptId: Option[String],
+  eventLogSize: Long,
+  isCompressed: Boolean): Unit = {
+val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed)
+if (memoryUsage + currentUsage.get > maxUsage) {
+  throw new RuntimeException("Not enough memory to create hybrid store " +
+s"for app $appId / $attemptId.")
+}
+active.synchronized {
+  active(appId -> attemptId) = memoryUsage
+}
+currentUsage.addAndGet(memoryUsage)
+logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
+  s"app $appId / $attemptId")
+  }
+
+  def release(appId: String, attemptId: Option[String]): Unit = {
+val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }
+
+memoryUsage match {
+  case Some(m) =>
+currentUsage.addAndGet(-m)
+logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
+  s"app $appId / $attemptId")
+  case None =>
+}
+  }
+
+  private def approximateMemoryUsage(eventLogSize: Long, isCompressed: 
Boolean): Long = {
+if (isCompressed) {
+  eventLogSize * 2

Review comment:
   Hi, I updated the number on 
https://github.com/apache/spark/pull/28412#issuecomment-653110422.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-26 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r446221919



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.util.Utils
+
+/**
+ * A class used to keep track of in-memory store usage by the SHS.
+ */
+private class HistoryServerMemoryManager(
+conf: SparkConf) extends Logging {
+
+  private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+logInfo("Initialized memory manager: " +
+  s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+  s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  def lease(
+  appId: String,
+  attemptId: Option[String],
+  eventLogSize: Long,
+  isCompressed: Boolean): Unit = {
+val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed)
+if (memoryUsage + currentUsage.get > maxUsage) {
+  throw new RuntimeException("Not enough memory to create hybrid store " +
+s"for app $appId / $attemptId.")
+}
+active.synchronized {
+  active(appId -> attemptId) = memoryUsage
+}
+currentUsage.addAndGet(memoryUsage)
+logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
+  s"app $appId / $attemptId")
+  }
+
+  def release(appId: String, attemptId: Option[String]): Unit = {
+val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }
+
+memoryUsage match {
+  case Some(m) =>
+currentUsage.addAndGet(-m)
+logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
+  s"app $appId / $attemptId")
+  case None =>
+}
+  }
+
+  private def approximateMemoryUsage(eventLogSize: Long, isCompressed: 
Boolean): Long = {
+if (isCompressed) {
+  eventLogSize * 2

Review comment:
   Sure I will do the experiment with different compressions and put the 
number here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-16 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r441165290



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.util.Utils
+
+/**
+ * A class used to keep track of in-memory store usage by the SHS.
+ */
+private class HistoryServerMemoryManager(
+conf: SparkConf) extends Logging {
+
+  private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+logInfo("Initialized memory manager: " +
+  s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+  s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  def lease(
+  appId: String,
+  attemptId: Option[String],
+  eventLogSize: Long,
+  isCompressed: Boolean): Unit = {
+val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed)
+if (memoryUsage + currentUsage.get > maxUsage) {
+  throw new RuntimeException("Not enough memory to create hybrid store " +
+s"for app $appId / $attemptId.")
+}
+active.synchronized {
+  active(appId -> attemptId) = memoryUsage
+}
+currentUsage.addAndGet(memoryUsage)
+logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
+  s"app $appId / $attemptId")
+  }
+
+  def release(appId: String, attemptId: Option[String]): Unit = {
+val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }
+
+memoryUsage match {
+  case Some(m) =>
+currentUsage.addAndGet(-m)
+logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
+  s"app $appId / $attemptId")
+  case None =>
+}
+  }
+
+  private def approximateMemoryUsage(eventLogSize: Long, isCompressed: 
Boolean): Long = {
+if (isCompressed) {
+  eventLogSize * 2

Review comment:
   comparison between memory usage and disk usage for some randomly chosen 
2.4 logs, parsed by 2.4 SHS
   
   log size | 1.0g | 1.26g | 1.16g | 1.11g
   -- | -- | -- | -- | --
   memory usage | 97.8m | 232m | 53.1m | 31.4m
   leveldb filesize | 183M | 368m | 220m | 99m
   
   
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-16 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r440955307



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.util.Utils
+
+/**
+ * A class used to keep track of in-memory store usage by the SHS.
+ */
+private class HistoryServerMemoryManager(
+conf: SparkConf) extends Logging {
+
+  private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+logInfo("Initialized memory manager: " +
+  s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+  s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  def lease(
+  appId: String,
+  attemptId: Option[String],
+  eventLogSize: Long,
+  isCompressed: Boolean): Unit = {
+val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed)
+if (memoryUsage + currentUsage.get > maxUsage) {
+  throw new RuntimeException("Not enough memory to create hybrid store " +
+s"for app $appId / $attemptId.")
+}
+active.synchronized {
+  active(appId -> attemptId) = memoryUsage
+}
+currentUsage.addAndGet(memoryUsage)
+logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
+  s"app $appId / $attemptId")
+  }
+
+  def release(appId: String, attemptId: Option[String]): Unit = {
+val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }
+
+memoryUsage match {
+  case Some(m) =>
+currentUsage.addAndGet(-m)
+logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
+  s"app $appId / $attemptId")
+  case None =>
+}
+  }
+
+  private def approximateMemoryUsage(eventLogSize: Long, isCompressed: 
Boolean): Long = {
+if (isCompressed) {
+  eventLogSize * 2

Review comment:
   I will add comments about this function and test compressed files later. 
So far I think this estimation is conservative and safe. For large uncompressed 
log files, the memory and disk usage are usually smaller than the estimated 
usage(half of the log file size). I measured some log files with size ~1g that 
I randomly picked from production-use spark jobs.
   
   edit: The result seems not accurate because I am parsing spark2.4 log files 
using spark 3 history server.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-16 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r440955307



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.util.Utils
+
+/**
+ * A class used to keep track of in-memory store usage by the SHS.
+ */
+private class HistoryServerMemoryManager(
+conf: SparkConf) extends Logging {
+
+  private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+logInfo("Initialized memory manager: " +
+  s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+  s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  def lease(
+  appId: String,
+  attemptId: Option[String],
+  eventLogSize: Long,
+  isCompressed: Boolean): Unit = {
+val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed)
+if (memoryUsage + currentUsage.get > maxUsage) {
+  throw new RuntimeException("Not enough memory to create hybrid store " +
+s"for app $appId / $attemptId.")
+}
+active.synchronized {
+  active(appId -> attemptId) = memoryUsage
+}
+currentUsage.addAndGet(memoryUsage)
+logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
+  s"app $appId / $attemptId")
+  }
+
+  def release(appId: String, attemptId: Option[String]): Unit = {
+val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }
+
+memoryUsage match {
+  case Some(m) =>
+currentUsage.addAndGet(-m)
+logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
+  s"app $appId / $attemptId")
+  case None =>
+}
+  }
+
+  private def approximateMemoryUsage(eventLogSize: Long, isCompressed: 
Boolean): Long = {
+if (isCompressed) {
+  eventLogSize * 2

Review comment:
   I will add comments about this function and test compressed files later. 
So far I think this estimation is conservative and safe. For large uncompressed 
log files, the memory and disk usage are usually much smaller than the 
estimated usage(half of the log file size). I measured some log files with size 
~1g that I randomly picked from production-use spark jobs.
   
   log filesize | 1.0g | 1.26g | 1.16g | 1.11g
   -- | -- | -- | -- | --
   memory usage | 113m | 149m | 63m | 46m
   leveldb filesize | 175m | 211m | 42m | 77m
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-16 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r440907281



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.util.Utils
+
+/**
+ * A class used to keep track of in-memory store usage by the SHS.
+ */
+private class HistoryServerMemoryManager(
+conf: SparkConf) extends Logging {
+
+  private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+logInfo("Initialized memory manager: " +
+  s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+  s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  def lease(
+  appId: String,
+  attemptId: Option[String],
+  eventLogSize: Long,
+  isCompressed: Boolean): Unit = {
+val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed)
+if (memoryUsage + currentUsage.get > maxUsage) {
+  throw new RuntimeException("Not enough memory to create hybrid store " +
+s"for app $appId / $attemptId.")
+}
+active.synchronized {
+  active(appId -> attemptId) = memoryUsage
+}
+currentUsage.addAndGet(memoryUsage)
+logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
+  s"app $appId / $attemptId")
+  }
+
+  def release(appId: String, attemptId: Option[String]): Unit = {
+val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }
+
+memoryUsage match {
+  case Some(m) =>
+currentUsage.addAndGet(-m)
+logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
+  s"app $appId / $attemptId")
+  case None =>
+}
+  }
+
+  private def approximateMemoryUsage(eventLogSize: Long, isCompressed: 
Boolean): Long = {
+if (isCompressed) {
+  eventLogSize * 2

Review comment:
   This memory usage approximation follows the way we estimate disk usage 
in HistoryServerDiskManager. In disk manager, we estimate the disk usage based 
on the log file size. I did some experiments to compare the memory usage of 
hybridStore (measured through org.apache.spark.util.SizeEstimator) and the disk 
usage of levelDB on a set of log files. I found they are similar. So I think if 
disk usage approximation works, this kind of memory usage approximation should 
work.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-15 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r440499121



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * An implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the app store is restored. We don't expect write
+ * operations (except the case for caching) after calling switch to level DB.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // Flag to indicate whether this hybrid store is closed, use this flag
+  // to avoid starting background thread after the store is closed
+  private val closed = new AtomicBoolean(false)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+getStore().write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+closed.set(true)
+
+if (backgroundThread != null && backgroundThread.isAlive()) {
+  // The background thread is still running, wait for it to finish
+  backgroundThread.join()
+}
+
+try {
+  if (levelDB != null) {
+levelDB.close()
+  }
+} catch {
+  case ioe: IOException => throw ioe
+} finally {
+  inMemoryStore.close()
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("removeAllByIndexValues() shouldn't be " 
+
+"called after the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+if (closed.get) {
+  

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-15 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r440490267



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * An implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the app store is restored. We don't expect write
+ * operations (except the case for caching) after calling switch to level DB.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // Flag to indicate whether this hybrid store is closed, use this flag
+  // to avoid starting background thread after the store is closed
+  private val closed = new AtomicBoolean(false)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+getStore().write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  closed.set(true)
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+} finally {
+  inMemoryStore.close()
+  if (levelDB != null) {
+levelDB.close()
+  }
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new IllegalStateException("removeAllByIndexValues() shouldn't be " 
+
+"called after the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+if (closed.get) {
+  return
+}
+
+backgroundThread = new 

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-15 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r440489973



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1167,6 +1172,17 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 // At this point the disk data either does not exist or was deleted 
because it failed to
 // load, so the event log needs to be replayed.
 
+// If hybrid store is enabled, try it first.
+if (hybridStoreEnabled) {
+  try {
+return createHybridStore(dm, appId, attempt, metadata)
+  } catch {
+case e: Exception =>
+  logInfo(s"Failed to create HybridStore for 
$appId/${attempt.info.attemptId}." +
+" Using LevelDB.", e)
+  }
+}
+

Review comment:
   Addressed. Split the logic below to a new function createLevelDBStore().

##
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##
@@ -195,4 +195,20 @@ private[spark] object History {
   .version("3.0.0")
   .booleanConf
   .createWithDefault(true)
+
+  val HYBRID_STORE_ENABLED = 
ConfigBuilder("spark.history.store.hybridStore.enabled")
+.doc("Whether to use HybridStore as the store when parsing event logs. " +
+  "HybridStore will first write data to an in-memory store and having a 
background thread " +
+  "that dumps data to a disk store after the writing to in-memory store is 
completed.")
+.version("3.1.0")
+.booleanConf
+.createWithDefault(false)
+
+  val MAX_IN_MEMORY_STORE_USAGE = 
ConfigBuilder("spark.history.store.hybridStore.maxMemoryUsage")

Review comment:
   monitoring.md updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-15 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r440489736



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -128,6 +128,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
   private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)
 
+  private val hybridStoreEnabled = conf.get(History.HYBRID_STORE_ENABLED)
+
+  private val memoryManager = new HistoryServerMemoryManager(conf)

Review comment:
   Addressed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437849260



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+val store = getStore()
+store.write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new RuntimeException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+  if (levelDB != null) {

Review comment:
   Sorry I missed this comment. Yeah, you are right, join() can throw 
InterruptedException. I will update the code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437837726



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1263,17 +1241,34 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 "trying again...")
   store.close()
   memoryManager.release(appId, attempt.info.attemptId)
-  lease.rollback()
   retried = true
-
 case e: Exception =>
   store.close()
   memoryManager.release(appId, attempt.info.attemptId)
-  lease.rollback()
   throw e
   }
 }
 
+// Create a LevelDB and start a background thread to dump data to LevelDB
+logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")

Review comment:
   ok, will update it. Thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437837248



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1263,17 +1241,34 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 "trying again...")
   store.close()
   memoryManager.release(appId, attempt.info.attemptId)
-  lease.rollback()
   retried = true
-
 case e: Exception =>
   store.close()
   memoryManager.release(appId, attempt.info.attemptId)
-  lease.rollback()
   throw e
   }
 }
 
+// Create a LevelDB and start a background thread to dump data to LevelDB
+logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")

Review comment:
   do you mean exception thrown in the foreground thread or in the 
background thread? If it's in foreground thread, we can use a try-catch block 
to close the store when an exception is thrown.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437811279



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1197,6 +1213,78 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+  dm: HistoryServerDiskManager,
+  appId: String,
+  attempt: AttemptInfoWrapper,
+  metadata: AppStatusStoreMetadata): KVStore = {
+
+var retried = false
+var hybridStore: HybridStore = null
+while (hybridStore == null) {
+  val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+attempt.lastIndex)
+  val isCompressed = reader.compressionCodec.isDefined
+
+  // Throws an exception if the memory space is not enough
+  memoryManager.lease(appId, attempt.info.attemptId, reader.totalSize, 
isCompressed)
+
+  logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+  val lease = dm.lease(reader.totalSize, isCompressed)
+  val isLeaseRolledBack = new 
java.util.concurrent.atomic.AtomicBoolean(false)
+  var store: HybridStore = null
+  try {
+store = new HybridStore()
+val levelDB = KVUtils.open(lease.tmpPath, metadata)
+store.setLevelDB(levelDB)
+rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
+
+// Start the background thread to dump data to levelDB when writing to
+// InMemoryStore is completed.
+store.switchToLevelDB(new HybridStore.SwitchToLevelDBListener {

Review comment:
   yeah, that makes sense, I will update the code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437608213



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+val store = getStore()
+store.write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new RuntimeException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+  if (levelDB != null) {
+levelDB.close()
+  }
+} catch {
+  case ioe: IOException => throw ioe
+} finally {
+  inMemoryStore.close()
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+backgroundThread = new Thread(() => {
+  var exception: Option[Exception] = None
+
+  try {
+for (klass <- klassMap.keys().asScala) {
+  val it = inMemoryStore.view(klass).closeableIterator()
+  while (it.hasNext()) {
+levelDB.write(it.next())
+  }

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437602431



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+val store = getStore()
+store.write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)

Review comment:
   write operation is only allowed for CacheQuantile objects after the 
rebuildAppStore() is finished. Here if we want to throw IIlegalStateException, 
we need to have special logic to check if the value is of class CacheQuantile. 
I think we would prefer to avoid that to make the HybridStore as generic as 
possible.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437600535



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1167,6 +1172,17 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 // At this point the disk data either does not exist or was deleted 
because it failed to
 // load, so the event log needs to be replayed.
 
+// If hybrid store is enabled, try it first.
+if (hybridStoreEnabled) {
+  try {
+return createHybridStore(dm, appId, attempt, metadata)
+  } catch {
+case e: Exception =>
+  logInfo(s"Failed to create hybrid store for 
$appId/${attempt.info.attemptId}." +

Review comment:
   yeah, I think logInfo is ok since levelDB would work. And this usually 
occurs when the memory space is not enough.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437598233



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1197,6 +1213,78 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+  dm: HistoryServerDiskManager,
+  appId: String,
+  attempt: AttemptInfoWrapper,
+  metadata: AppStatusStoreMetadata): KVStore = {
+
+var retried = false
+var hybridStore: HybridStore = null
+while (hybridStore == null) {
+  val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+attempt.lastIndex)
+  val isCompressed = reader.compressionCodec.isDefined
+
+  // Throws an exception if the memory space is not enough
+  memoryManager.lease(appId, attempt.info.attemptId, reader.totalSize, 
isCompressed)
+
+  logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+  val lease = dm.lease(reader.totalSize, isCompressed)
+  val isLeaseRolledBack = new 
java.util.concurrent.atomic.AtomicBoolean(false)
+  var store: HybridStore = null
+  try {
+store = new HybridStore()
+val levelDB = KVUtils.open(lease.tmpPath, metadata)
+store.setLevelDB(levelDB)
+rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
+
+// Start the background thread to dump data to levelDB when writing to
+// InMemoryStore is completed.
+store.switchToLevelDB(new HybridStore.SwitchToLevelDBListener {

Review comment:
   I would prefer to keep store.switchToLevelDB() in the try block and 
remove isLeaseRolledBack variable.  If we move it out of the try block, we 
would need one more variable to track whether we have encountered exceptions 
during the rebuildAppStore(), otherwise, store.switchToLevelDB() will be 
executed when the rebuildAppStore() is failed due to IOException. What do you 
think?
   
   I added a closed variable to indicate whether the hybrid store is closed, 
and not to start the background thread when the closed is true. This can 
address the issue above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437581083



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.util.Utils
+
+/**
+ * A class used to keep track of in-memory store usage by the SHS.
+ */
+private class HistoryServerMemoryManager(
+conf: SparkConf) extends Logging {
+
+  private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+logInfo("Initialized memory manager: " +
+  s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+  s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  def lease(
+  appId: String,
+  attemptId: Option[String],
+  eventLogSize: Long,
+  isCompressed: Boolean): Unit = {
+val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed)
+if (memoryUsage + currentUsage.get > maxUsage) {
+  throw new RuntimeException("Not enough memory to create hybrid store " +
+s"for app $appId / $attemptId.")
+}
+active.synchronized {
+  active(appId -> attemptId) = memoryUsage
+}
+currentUsage.addAndGet(memoryUsage)
+logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
+  s"app $appId / $attemptId")
+  }
+
+  def release(appId: String, attemptId: Option[String]): Unit = {
+val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }
+
+memoryUsage match {
+  case Some(m) =>
+currentUsage.addAndGet(-m)
+logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
+  s"app $appId / $attemptId")
+  case None =>
+}
+  }
+
+  def approximateMemoryUsage(eventLogSize: Long, isCompressed: Boolean): Long 
= {

Review comment:
   private added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437580626



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.

Review comment:
   class doc updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437580889



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+val store = getStore()
+store.write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new RuntimeException("delete() shouldn't be called after " +

Review comment:
   updated to IllegalStateException

##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437580420



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+val store = getStore()
+store.write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new RuntimeException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+  if (levelDB != null) {

Review comment:
   In the current implementation, the background thread won't throw 
uncaught exceptions. So I think levelDB.close() is guaranteed to be executed. 
Here the try-catch block is trying to catch the IOException that might be 
thrown during levelDB.close().





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437578109



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+val store = getStore()
+store.write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new RuntimeException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+  if (levelDB != null) {
+levelDB.close()
+  }
+} catch {
+  case ioe: IOException => throw ioe
+} finally {
+  inMemoryStore.close()
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+backgroundThread = new Thread(() => {
+  var exception: Option[Exception] = None
+
+  try {
+for (klass <- klassMap.keys().asScala) {
+  val it = inMemoryStore.view(klass).closeableIterator()
+  while (it.hasNext()) {
+levelDB.write(it.next())
+  }

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437578179



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+val store = getStore()
+store.write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new RuntimeException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+  if (levelDB != null) {
+levelDB.close()
+  }
+} catch {
+  case ioe: IOException => throw ioe
+} finally {
+  inMemoryStore.close()
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+backgroundThread = new Thread(() => {
+  var exception: Option[Exception] = None
+
+  try {
+for (klass <- klassMap.keys().asScala) {
+  val it = inMemoryStore.view(klass).closeableIterator()
+  while (it.hasNext()) {
+levelDB.write(it.next())
+  }

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r437577822



##
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+val store = getStore()
+store.write(value)
+
+if (backgroundThread == null) {
+  // New classes won't be dumped once the background thread is started
+  klassMap.putIfAbsent(value.getClass(), true)
+}
+  }
+
+  override def delete(klass: Class[_], naturalKey: Object): Unit = {
+if (backgroundThread != null) {
+  throw new RuntimeException("delete() shouldn't be called after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().delete(klass, naturalKey)
+  }
+
+  override def view[T](klass: Class[T]): KVStoreView[T] = {
+getStore().view(klass)
+  }
+
+  override def count(klass: Class[_]): Long = {
+getStore().count(klass)
+  }
+
+  override def count(klass: Class[_], index: String, indexedValue: Object): 
Long = {
+getStore().count(klass, index, indexedValue)
+  }
+
+  override def close(): Unit = {
+try {
+  if (backgroundThread != null && backgroundThread.isAlive()) {
+// The background thread is still running, wait for it to finish
+backgroundThread.join()
+  }
+  if (levelDB != null) {
+levelDB.close()
+  }
+} catch {
+  case ioe: IOException => throw ioe
+} finally {
+  inMemoryStore.close()
+}
+  }
+
+  override def removeAllByIndexValues[T](
+  klass: Class[T],
+  index: String,
+  indexValues: Collection[_]): Boolean = {
+if (backgroundThread != null) {
+  throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+"the hybrid store begins switching to levelDB")
+}
+
+getStore().removeAllByIndexValues(klass, index, indexValues)
+  }
+
+  def setLevelDB(levelDB: LevelDB): Unit = {
+this.levelDB = levelDB
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  def switchToLevelDB(listener: HybridStore.SwitchToLevelDBListener): Unit = {
+backgroundThread = new Thread(() => {
+  var exception: Option[Exception] = None
+
+  try {
+for (klass <- klassMap.keys().asScala) {
+  val it = inMemoryStore.view(klass).closeableIterator()
+  while (it.hasNext()) {
+levelDB.write(it.next())
+  }

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-06-09 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r436796185



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1197,6 +1213,71 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+  dm: HistoryServerDiskManager,
+  appId: String,
+  attempt: AttemptInfoWrapper,
+  metadata: AppStatusStoreMetadata): KVStore = {
+
+val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+  attempt.lastIndex)
+val isCompressed = reader.compressionCodec.isDefined
+
+val memoryUsage = approximateMemoryUsage(reader.totalSize, isCompressed)
+if (currentInMemoryStoreUsage.get + memoryUsage > maxInMemoryStoreUsage) {
+  throw new IllegalStateException("Not enough in-memory storage to create 
hybrid store.")
+}
+currentInMemoryStoreUsage.addAndGet(memoryUsage)
+logInfo(s"Attempt creating hybrid store to parse $appId / 
${attempt.info.attemptId}. " +
+  s"Requested ${Utils.bytesToString(memoryUsage)} in-memory storage 
quota.")
+
+logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+val lease = dm.lease(reader.totalSize, isCompressed)
+val isLeaseRolledBack = new 
java.util.concurrent.atomic.AtomicBoolean(false)
+var store: HybridStore = null
+try {
+  store = new HybridStore()
+  val levelDB = KVUtils.open(lease.tmpPath, metadata)
+  store.setLevelDB(levelDB)
+  store.setCachedQuantileKlass(classOf[CachedQuantile])

Review comment:
   ok, will do that. I am moving it to org.apache.spark.deploy.history 
module.

##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1197,6 +1213,71 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+  dm: HistoryServerDiskManager,
+  appId: String,
+  attempt: AttemptInfoWrapper,
+  metadata: AppStatusStoreMetadata): KVStore = {
+
+val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+  attempt.lastIndex)
+val isCompressed = reader.compressionCodec.isDefined
+
+val memoryUsage = approximateMemoryUsage(reader.totalSize, isCompressed)
+if (currentInMemoryStoreUsage.get + memoryUsage > maxInMemoryStoreUsage) {
+  throw new IllegalStateException("Not enough in-memory storage to create 
hybrid store.")
+}
+currentInMemoryStoreUsage.addAndGet(memoryUsage)
+logInfo(s"Attempt creating hybrid store to parse $appId / 
${attempt.info.attemptId}. " +
+  s"Requested ${Utils.bytesToString(memoryUsage)} in-memory storage 
quota.")
+
+logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+val lease = dm.lease(reader.totalSize, isCompressed)
+val isLeaseRolledBack = new 
java.util.concurrent.atomic.AtomicBoolean(false)
+var store: HybridStore = null
+try {
+  store = new HybridStore()
+  val levelDB = KVUtils.open(lease.tmpPath, metadata)
+  store.setLevelDB(levelDB)
+  store.setCachedQuantileKlass(classOf[CachedQuantile])
+  rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
+
+  // Start the background thread to dump data to levelDB when writing to
+  // InMemoryStore is completed.
+  store.switchingToLevelDB(new HybridStore.SwitchingToLevelDBListener {
+override def onSwitchingToLevelDBSuccess: Unit = {
+  levelDB.close()
+  val newStorePath = lease.commit(appId, attempt.info.attemptId)
+  store.setLevelDB(KVUtils.open(newStorePath, metadata))
+  currentInMemoryStoreUsage.addAndGet(-memoryUsage)
+  logInfo(s"Completely switched to leveldb for app" +
+s" $appId / ${attempt.info.attemptId}. " +
+s"Released ${Utils.bytesToString(memoryUsage)} in-memory storage 
quota.")
+}
+
+override def onSwitchingToLevelDBFail(e: Exception): Unit = {
+  logWarning(s"Failed to switch to leveldb for app" +
+  s" $appId / ${attempt.info.attemptId}", e)
+  levelDB.close()
+  if (!isLeaseRolledBack.getAndSet(true)) {
+lease.rollback()
+  }
+}
+  })
+
+  store
+} catch {
+  case e: Exception =>
+store.close()
+currentInMemoryStoreUsage.addAndGet(-memoryUsage)

Review comment:
   decrementAndGet() only decrements one, there is no method for 
decrementAndGet(long n)

##
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##
@@ -195,4 +195,18 @@ private[spark] object History {
   .version("3.0.0")
   .booleanConf
   .createWithDefault(true)
+
+  val HYBRID_STORE_ENABLED = 

[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-05-05 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r420531106



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 // At this point the disk data either does not exist or was deleted 
because it failed to
 // load, so the event log needs to be replayed.
 
+// TODO: Maybe need to do other check to see if there's enough memory to
+// use inMemoryStore.
+if (hybridKVStoreEnabled) {
+  logInfo("Using HybridKVStore as KVStore")
+  var retried = false
+  var store: HybridKVStore = null
+  while(store == null) {
+val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+  attempt.lastIndex)
+val isCompressed = reader.compressionCodec.isDefined
+logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+val lease = dm.lease(reader.totalSize, isCompressed)
+try {
+  val s = new HybridKVStore()
+  val levelDB = KVUtils.open(lease.tmpPath, metadata)
+  s.setLevelDB(levelDB)
+
+  s.startBackgroundThreadToWriteToDB(new 
HybridKVStore.SwitchingToLevelDBListener {
+override def onSwitchingToLevelDBSuccess: Unit = {
+  levelDB.close()
+  val newStorePath = lease.commit(appId, attempt.info.attemptId)
+  s.setLevelDB(KVUtils.open(newStorePath, metadata))
+  logInfo(s"Completely switched to use leveldb for app" +
+  s" $appId / ${attempt.info.attemptId}")
+}
+
+override def onSwitchingToLevelDBFail(e: Exception): Unit = {
+  logWarning(s"Failed to switch to use LevelDb for app" +
+  s" $appId / ${attempt.info.attemptId}")
+  levelDB.close()
+  throw e

Review comment:
   1 is addressed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-05-05 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r420530840



##
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##
@@ -195,4 +195,9 @@ private[spark] object History {
   .version("3.0.0")
   .booleanConf
   .createWithDefault(true)
+
+  val HYBRID_KVSTORE_ENABLED = 
ConfigBuilder("spark.history.store.hybridKVStore.enabled")
+.version("3.0.1")

Review comment:
   Addressed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-05-05 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r420525489



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 // At this point the disk data either does not exist or was deleted 
because it failed to
 // load, so the event log needs to be replayed.
 
+// TODO: Maybe need to do other check to see if there's enough memory to
+// use inMemoryStore.
+if (hybridKVStoreEnabled) {
+  logInfo("Using HybridKVStore as KVStore")
+  var retried = false
+  var store: HybridKVStore = null
+  while(store == null) {
+val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+  attempt.lastIndex)
+val isCompressed = reader.compressionCodec.isDefined
+logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+val lease = dm.lease(reader.totalSize, isCompressed)
+try {
+  val s = new HybridKVStore()
+  val levelDB = KVUtils.open(lease.tmpPath, metadata)
+  s.setLevelDB(levelDB)
+
+  s.startBackgroundThreadToWriteToDB(new 
HybridKVStore.SwitchingToLevelDBListener {
+override def onSwitchingToLevelDBSuccess: Unit = {
+  levelDB.close()
+  val newStorePath = lease.commit(appId, attempt.info.attemptId)
+  s.setLevelDB(KVUtils.open(newStorePath, metadata))
+  logInfo(s"Completely switched to use leveldb for app" +
+  s" $appId / ${attempt.info.attemptId}")
+}
+
+override def onSwitchingToLevelDBFail(e: Exception): Unit = {
+  logWarning(s"Failed to switch to use LevelDb for app" +
+  s" $appId / ${attempt.info.attemptId}")
+  levelDB.close()
+  throw e
+}
+  })
+
+  rebuildAppStore(s, reader, attempt.info.lastUpdated.getTime())
+  s.stopBackgroundThreadAndSwitchToLevelDB()
+  store = s
+} catch {
+  case _: IOException if !retried =>
+// compaction may touch the file(s) which app rebuild wants to read
+// compaction wouldn't run in short interval, so try again...
+logWarning(s"Exception occurred while rebuilding app $appId - 
trying again...")
+lease.rollback()

Review comment:
   Hybrid kvstore is now cleaned up explicitly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-05-05 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r420524061



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 // At this point the disk data either does not exist or was deleted 
because it failed to
 // load, so the event log needs to be replayed.
 
+// TODO: Maybe need to do other check to see if there's enough memory to
+// use inMemoryStore.
+if (hybridKVStoreEnabled) {

Review comment:
   I found it's difficult to pass Lease to hybrid kvstore, since Lease 
class is only visible within the History package.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-05-05 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r420463955



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 // At this point the disk data either does not exist or was deleted 
because it failed to
 // load, so the event log needs to be replayed.
 
+// TODO: Maybe need to do other check to see if there's enough memory to
+// use inMemoryStore.
+if (hybridKVStoreEnabled) {
+  logInfo("Using HybridKVStore as KVStore")
+  var retried = false
+  var store: HybridKVStore = null
+  while(store == null) {
+val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+  attempt.lastIndex)
+val isCompressed = reader.compressionCodec.isDefined
+logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+val lease = dm.lease(reader.totalSize, isCompressed)
+try {
+  val s = new HybridKVStore()
+  val levelDB = KVUtils.open(lease.tmpPath, metadata)
+  s.setLevelDB(levelDB)
+
+  s.startBackgroundThreadToWriteToDB(new 
HybridKVStore.SwitchingToLevelDBListener {
+override def onSwitchingToLevelDBSuccess: Unit = {
+  levelDB.close()
+  val newStorePath = lease.commit(appId, attempt.info.attemptId)
+  s.setLevelDB(KVUtils.open(newStorePath, metadata))
+  logInfo(s"Completely switched to use leveldb for app" +
+  s" $appId / ${attempt.info.attemptId}")
+}
+
+override def onSwitchingToLevelDBFail(e: Exception): Unit = {
+  logWarning(s"Failed to switch to use LevelDb for app" +
+  s" $appId / ${attempt.info.attemptId}")
+  levelDB.close()
+  throw e

Review comment:
   Will address 1. For 2, if switching to leveldb failed, hybrid kvstore 
will always use in-memory kvstore as it's underlying kvstore.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-05-05 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r420462651



##
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##
@@ -195,4 +195,9 @@ private[spark] object History {
   .version("3.0.0")
   .booleanConf
   .createWithDefault(true)
+
+  val HYBRID_KVSTORE_ENABLED = 
ConfigBuilder("spark.history.store.hybridKVStore.enabled")
+.version("3.0.1")

Review comment:
   Will address it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-05-05 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r420462494



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 // At this point the disk data either does not exist or was deleted 
because it failed to
 // load, so the event log needs to be replayed.
 
+// TODO: Maybe need to do other check to see if there's enough memory to
+// use inMemoryStore.
+if (hybridKVStoreEnabled) {

Review comment:
   Agree. I will address it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] baohe-zhang commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

2020-05-05 Thread GitBox


baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r420461909



##
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 // At this point the disk data either does not exist or was deleted 
because it failed to
 // load, so the event log needs to be replayed.
 
+// TODO: Maybe need to do other check to see if there's enough memory to
+// use inMemoryStore.
+if (hybridKVStoreEnabled) {
+  logInfo("Using HybridKVStore as KVStore")
+  var retried = false
+  var store: HybridKVStore = null
+  while(store == null) {
+val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+  attempt.lastIndex)
+val isCompressed = reader.compressionCodec.isDefined
+logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+val lease = dm.lease(reader.totalSize, isCompressed)
+try {
+  val s = new HybridKVStore()
+  val levelDB = KVUtils.open(lease.tmpPath, metadata)
+  s.setLevelDB(levelDB)
+
+  s.startBackgroundThreadToWriteToDB(new 
HybridKVStore.SwitchingToLevelDBListener {
+override def onSwitchingToLevelDBSuccess: Unit = {
+  levelDB.close()
+  val newStorePath = lease.commit(appId, attempt.info.attemptId)
+  s.setLevelDB(KVUtils.open(newStorePath, metadata))
+  logInfo(s"Completely switched to use leveldb for app" +
+  s" $appId / ${attempt.info.attemptId}")
+}
+
+override def onSwitchingToLevelDBFail(e: Exception): Unit = {
+  logWarning(s"Failed to switch to use LevelDb for app" +
+  s" $appId / ${attempt.info.attemptId}")
+  levelDB.close()
+  throw e
+}
+  })
+
+  rebuildAppStore(s, reader, attempt.info.lastUpdated.getTime())
+  s.stopBackgroundThreadAndSwitchToLevelDB()
+  store = s
+} catch {
+  case _: IOException if !retried =>
+// compaction may touch the file(s) which app rebuild wants to read
+// compaction wouldn't run in short interval, so try again...
+logWarning(s"Exception occurred while rebuilding app $appId - 
trying again...")
+lease.rollback()

Review comment:
   I didn't quite understand this comment, could you elaborate more? I 
think in the current implementation, if any exceptions are thrown when 
migrating data to leveldb, the hybrid kvstore will not switch to leveldb and 
the getStore() method in hybrid kvstore will always return an in-memory kvstore.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org