Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/18887#discussion_r139346721
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -117,17 +122,37 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
// used for logging msgs (logs are re-scanned based on file size, rather
than modtime)
private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
- // Mapping of application IDs to their metadata, in descending end time
order. Apps are inserted
- // into the map in order, so the LinkedHashMap maintains the correct
ordering.
- @volatile private var applications: mutable.LinkedHashMap[String,
FsApplicationHistoryInfo]
- = new mutable.LinkedHashMap()
+ private val pendingReplayTasksCount = new
java.util.concurrent.atomic.AtomicInteger(0)
- val fileToAppInfo = new ConcurrentHashMap[Path,
FsApplicationAttemptInfo]()
+ private val storePath = conf.get(LOCAL_STORE_DIR)
- // List of application logs to be deleted by event log cleaner.
- private var attemptsToClean = new
mutable.ListBuffer[FsApplicationAttemptInfo]
+ private val listing: KVStore = storePath.map { path =>
+ val dbPath = new File(path, "listing.ldb")
- private val pendingReplayTasksCount = new
java.util.concurrent.atomic.AtomicInteger(0)
+ def openDB(): LevelDB = new LevelDB(dbPath, new
KVStoreScalaSerializer())
+
+ try {
+ val db = openDB()
+ val meta = db.getMetadata(classOf[KVStoreMetadata])
+
+ if (meta == null) {
+ db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION,
logDir))
+ db
+ } else if (meta.version != CURRENT_LISTING_VERSION ||
!logDir.equals(meta.logDir)) {
--- End diff --
Could we add a test case to check this logics works as expected? It sounds
like this is a completely new feature.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]