HeartSaVioR commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r419768275
##########
File path:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
// At this point the disk data either does not exist or was deleted
because it failed to
// load, so the event log needs to be replayed.
+ // TODO: Maybe need to do other check to see if there's enough memory to
+ // use inMemoryStore.
+ if (hybridKVStoreEnabled) {
+ logInfo("Using HybridKVStore as KVStore")
+ var retried = false
+ var store: HybridKVStore = null
+ while(store == null) {
+ val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+ attempt.lastIndex)
+ val isCompressed = reader.compressionCodec.isDefined
+ logInfo(s"Leasing disk manager space for app $appId /
${attempt.info.attemptId}...")
+ val lease = dm.lease(reader.totalSize, isCompressed)
+ try {
+ val s = new HybridKVStore()
+ val levelDB = KVUtils.open(lease.tmpPath, metadata)
+ s.setLevelDB(levelDB)
+
+ s.startBackgroundThreadToWriteToDB(new
HybridKVStore.SwitchingToLevelDBListener {
+ override def onSwitchingToLevelDBSuccess: Unit = {
+ levelDB.close()
+ val newStorePath = lease.commit(appId, attempt.info.attemptId)
+ s.setLevelDB(KVUtils.open(newStorePath, metadata))
+ logInfo(s"Completely switched to use leveldb for app" +
+ s" $appId / ${attempt.info.attemptId}")
+ }
+
+ override def onSwitchingToLevelDBFail(e: Exception): Unit = {
+ logWarning(s"Failed to switch to use LevelDb for app" +
+ s" $appId / ${attempt.info.attemptId}")
+ levelDB.close()
+ throw e
Review comment:
1. Here we should rollback the lease as well.
2. SHS would assume KVStore is loaded properly when the caller method
returns. If I understand correctly, throwing exception here doesn't propagate
to the caller method, which means HybridKVStore should still serve the content
in any way, in-memory store for this case. (Caution of memory usage)
##########
File path:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
// At this point the disk data either does not exist or was deleted
because it failed to
// load, so the event log needs to be replayed.
+ // TODO: Maybe need to do other check to see if there's enough memory to
+ // use inMemoryStore.
+ if (hybridKVStoreEnabled) {
+ logInfo("Using HybridKVStore as KVStore")
+ var retried = false
+ var store: HybridKVStore = null
+ while(store == null) {
+ val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+ attempt.lastIndex)
+ val isCompressed = reader.compressionCodec.isDefined
+ logInfo(s"Leasing disk manager space for app $appId /
${attempt.info.attemptId}...")
+ val lease = dm.lease(reader.totalSize, isCompressed)
+ try {
+ val s = new HybridKVStore()
+ val levelDB = KVUtils.open(lease.tmpPath, metadata)
+ s.setLevelDB(levelDB)
+
+ s.startBackgroundThreadToWriteToDB(new
HybridKVStore.SwitchingToLevelDBListener {
+ override def onSwitchingToLevelDBSuccess: Unit = {
+ levelDB.close()
+ val newStorePath = lease.commit(appId, attempt.info.attemptId)
+ s.setLevelDB(KVUtils.open(newStorePath, metadata))
+ logInfo(s"Completely switched to use leveldb for app" +
+ s" $appId / ${attempt.info.attemptId}")
+ }
+
+ override def onSwitchingToLevelDBFail(e: Exception): Unit = {
+ logWarning(s"Failed to switch to use LevelDb for app" +
+ s" $appId / ${attempt.info.attemptId}")
+ levelDB.close()
+ throw e
+ }
+ })
+
+ rebuildAppStore(s, reader, attempt.info.lastUpdated.getTime())
+ s.stopBackgroundThreadAndSwitchToLevelDB()
+ store = s
+ } catch {
+ case _: IOException if !retried =>
+ // compaction may touch the file(s) which app rebuild wants to read
+ // compaction wouldn't run in short interval, so try again...
+ logWarning(s"Exception occurred while rebuilding app $appId -
trying again...")
+ lease.rollback()
Review comment:
`lease.rollback()` was enough to throw out intermediate LevelDB KVStore
on temporary directory which fails to load at any reason. It doesn't look like
the case of Hybrid KVStore.
##########
File path:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
// At this point the disk data either does not exist or was deleted
because it failed to
// load, so the event log needs to be replayed.
+ // TODO: Maybe need to do other check to see if there's enough memory to
+ // use inMemoryStore.
+ if (hybridKVStoreEnabled) {
Review comment:
I've feeling that too much HybridKVStore implementation details are
exposed here which could be abstracted away if we have proper interface for
disk based loading-only KVStore for SHS.
E.g. suppose new interface on top of KVStore receives a Lease on
`initialize()`, and exposes `commit()` & `rollback()` to handle the
implementation details on each condition.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##########
@@ -195,4 +195,9 @@ private[spark] object History {
.version("3.0.0")
.booleanConf
.createWithDefault(true)
+
+ val HYBRID_KVSTORE_ENABLED =
ConfigBuilder("spark.history.store.hybridKVStore.enabled")
+ .version("3.0.1")
Review comment:
1. This needs to have doc to describe the functionality, as well as
proper caution on memory usage.
2. "3.1.0" is correct as of now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]