Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/20011#discussion_r158558556
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/DiskStoreManager.scala ---
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermissions
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.status.KVUtils._
+import org.apache.spark.util.{Clock, Utils}
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * A class used to keep track of disk usage by the SHS, allowing
application data to be deleted
+ * from disk when usage exceeds a configurable threshold.
+ *
+ * The goal of the class is not to guarantee that usage will never exceed
the threshold; because of
+ * how application data is written, disk usage may temporarily go higher.
But, eventually, it
+ * should fall back under the threshold.
+ *
+ * @param conf Spark configuration.
+ * @param path Path where to store application data.
+ * @param listing The listing store, used to persist usage data.
+ * @param clock Clock instance to use.
+ */
+private class DiskStoreManager(
+ conf: SparkConf,
+ path: File,
+ listing: KVStore,
+ clock: Clock) extends Logging {
+
+ import config._
+
+ private val appStoreDir = new File(path, "apps")
+ if (!appStoreDir.isDirectory() && !appStoreDir.mkdir()) {
+ throw new IllegalArgumentException(s"Failed to create app directory
($appStoreDir).")
+ }
+
+ private val tmpStoreDir = new File(path, "temp")
+ if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) {
+ throw new IllegalArgumentException(s"Failed to create temp directory
($tmpStoreDir).")
+ }
+
+ private val maxUsage = conf.get(MAX_LOCAL_DISK_USAGE)
+ private val currentUsage = new AtomicLong(0L)
+ private val active = new HashMap[(String, Option[String]), Long]()
+
+ def initialize(): Unit = {
+ updateUsage(sizeOf(appStoreDir))
+
+ // Clean up any temporary stores during start up. This assumes that
they're leftover from other
+ // instances and are not useful.
+ tmpStoreDir.listFiles().foreach(FileUtils.deleteQuietly)
+
+ // Go through the recorded store directories and remove any that may
have been removed by
+ // external code.
+ val orphans =
listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info =>
+ !new File(info.path).exists()
+ }.toSeq
+
+ orphans.foreach { info =>
+ listing.delete(info.getClass(), info.path)
+ }
+ }
+
+ /**
+ * Lease some space from the store. The leased space is calculated as a
fraction of the given
+ * event log size; this is an approximation, and doesn't mean the
application store cannot
+ * outgrow the lease.
+ *
+ * If there's not enough space for the lease, other applications might
be evicted to make room.
+ * This method always returns a lease, meaning that it's possible for
local disk usage to grow
+ * past the configured threshold if there aren't enough idle
applications to evict.
+ *
+ * While the lease is active, the data is written to a temporary
location, so `openStore()`
+ * will still return `None` for the application.
+ */
+ def lease(eventLogSize: Long, isCompressed: Boolean = false): Lease = {
+ val needed = approximateSize(eventLogSize, isCompressed)
+ makeRoom(needed)
+
+ val perms = PosixFilePermissions.fromString("rwx------")
+ val tmp = Files.createTempDirectory(tmpStoreDir.toPath(), "appstore",
+ PosixFilePermissions.asFileAttribute(perms)).toFile()
+
+ updateUsage(needed)
+ new Lease(tmp, needed)
+ }
+
+ /**
+ * Returns the location of an application store if it's still available.
Marks the store as
+ * being used so that it's not evicted when running out of designated
space.
+ */
+ def openStore(appId: String, attemptId: Option[String]): Option[File] = {
+ val storePath = active.synchronized {
+ val path = appStorePath(appId, attemptId)
+ if (path.isDirectory()) {
+ active(appId -> attemptId) = sizeOf(path)
+ Some(path)
+ } else {
+ None
+ }
+ }
+
+ storePath.foreach { path =>
+ updateAccessTime(appId, attemptId)
+ }
+
+ storePath
+ }
+
+ /**
+ * Tell the disk manager that the store for the given application is not
being used anymore.
+ *
+ * @param delete Whether to delete the store from disk.
+ */
+ def release(appId: String, attemptId: Option[String], delete: Boolean =
false): Unit = {
+ // Because LevelDB may modify the structure of the store files even
when just reading, update
+ // the accounting for this application when it's closed.
+ val oldSizeOpt = active.synchronized {
+ active.remove(appId -> attemptId)
+ }
+
+ oldSizeOpt.foreach { oldSize =>
+ val path = appStorePath(appId, attemptId)
+ updateUsage(-oldSize)
+ if (path.isDirectory()) {
+ if (delete) {
+ FileUtils.deleteDirectory(path)
+ listing.delete(classOf[ApplicationStoreInfo],
path.getAbsolutePath())
+ } else {
+ updateUsage(sizeOf(path))
+ }
+ }
+ }
+ }
+
+ /**
+ * A non-scientific approximation of how large an app state store will
be given the size of the
+ * event log.
+ */
+ def approximateSize(eventLogSize: Long, isCompressed: Boolean): Long = {
+ val expectedSize = if (isCompressed) {
+ // For compressed logs, assume that compression reduces the log size
a lot, and the disk
+ // store will actually grow compared to the log size.
+ eventLogSize * 2
+ } else {
+ // For non-compressed logs, assume the disk store will end up at
approximately 50% of the
+ // size of the logs. This is loosely based on empirical evidence.
+ eventLogSize / 2
+ }
+
+ // Cap the value at 10% of the max size; this assumes that element
cleanup will put a cap on
+ // how large the disk store can get, which may not always be the case.
+ math.min(expectedSize, maxUsage / 10)
+ }
+
+ /** Current free space. Considers space currently leased out too. */
+ def free(): Long = {
+ math.max(maxUsage - currentUsage.get(), 0L)
+ }
+
+ private def makeRoom(size: Long): Unit = {
+ if (free() < size) {
+ logDebug(s"Not enough free space, looking at candidates for
deletion...")
+ val evicted = new ListBuffer[ApplicationStoreInfo]()
+ Utils.tryWithResource(
+
listing.view(classOf[ApplicationStoreInfo]).index("lastAccess").closeableIterator()
+ ) { iter =>
+ var needed = size
+ while (needed > 0 && iter.hasNext()) {
+ val info = iter.next()
+ val isActive = active.synchronized {
+ active.contains(info.appId -> info.attemptId)
+ }
+ if (!isActive) {
+ evicted += info
+ needed -= info.size
+ }
+ }
+ }
+
+ evicted.foreach { info =>
+ logInfo(s"Deleting store for ${info.appId}/${info.attemptId}.")
+ FileUtils.deleteDirectory(new File(info.path))
+ listing.delete(info.getClass(), info.path)
+ }
+ logDebug(s"Deleted a total of ${evicted.size} app stores.")
+ }
+ }
+
+ private def appStorePath(appId: String, attemptId: Option[String]): File
= {
+ val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb"
+ new File(appStoreDir, fileName)
+ }
+
+ private def updateAccessTime(appId: String, attemptId: Option[String]):
Unit = {
+ val path = appStorePath(appId, attemptId)
+ val info = ApplicationStoreInfo(path.getAbsolutePath(),
clock.getTimeMillis(), appId, attemptId,
+ sizeOf(path))
+ listing.write(info)
+ }
+
+ private def updateUsage(delta: Long): Long = {
+ val updated = currentUsage.addAndGet(delta)
+ if (updated < 0) {
+ throw new IllegalStateException(
+ s"Disk usage tracker went negative (now = $updated, delta =
$delta)")
+ }
+ updated
+ }
+
+ /** Visible for testing. Return the size of a directory. */
+ private[history] def sizeOf(path: File): Long = FileUtils.sizeOf(path)
+
+ private[history] class Lease(val path: File, private val leased: Long) {
+
+ /**
+ * Commits a lease to its final location, and update accounting
information. This method
+ * marks the application as active, so its store is not available for
eviction.
+ */
+ def commit(appId: String, attemptId: Option[String]): File = {
+ val dst = appStorePath(appId, attemptId)
+
+ active.synchronized {
+ require(!active.contains(appId -> attemptId),
+ s"Cannot commit lease for active application $appId /
$attemptId")
--- End diff --
This is more of a sanity check that the situation you describe should not
happen. The SHS code should be ensuring that there's only a single thread
parsing logs for an application, and that while that happens, the app's UI is
not loaded. This just asserts that's the case.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]