Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/20011#discussion_r157641501
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/DiskStoreManager.scala ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermissions
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.status.KVUtils._
+import org.apache.spark.util.{Clock, Utils}
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * A class used to keep track of disk usage by the SHS, allowing
application data to be deleted
+ * from disk when usage exceeds a configurable threshold.
+ *
+ * The goal of the class is not to guarantee that usage will never exceed
the threshold; because of
+ * how application data is written, disk usage may temporarily go higher.
But, eventually, it
+ * should fall back under the threshold.
+ *
+ * @param conf Spark configuration.
+ * @param path Path where to store application data.
+ * @param listing The listing store, used to persist usage data.
+ * @param clock Clock instance to use.
+ */
+private class DiskStoreManager(
+ conf: SparkConf,
+ path: File,
+ listing: KVStore,
+ clock: Clock) extends Logging {
+
+ import config._
+
+ private val appStoreDir = new File(path, "apps")
+ if (!appStoreDir.isDirectory() && !appStoreDir.mkdir()) {
+ throw new IllegalArgumentException(s"Failed to create app directory
($appStoreDir).")
+ }
+
+ private val tmpStoreDir = new File(path, "temp")
+ if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) {
+ throw new IllegalArgumentException(s"Failed to create temp directory
($tmpStoreDir).")
+ }
+
+ private val eventLogSizeRatio = conf.get(EVENT_TO_STORE_SIZE_RATIO)
+ private val maxUsage = conf.get(MAX_LOCAL_DISK_USAGE)
+ private val currentUsage = new AtomicLong(0L)
+ private val active = new HashMap[(String, Option[String]), Long]()
+
+ def initialize(): Unit = {
+ updateUsage(sizeOf(appStoreDir))
+
+ // Clean up any temporary stores during start up. This assumes that
they're leftover from other
+ // instances and are not useful.
+ tmpStoreDir.listFiles().foreach(FileUtils.deleteQuietly)
+
+ // Go through the recorded store directories and remove any that may
have been removed by
+ // external code.
+ val orphans =
listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info =>
+ !new File(info.path).exists()
+ }.toSeq
+
+ orphans.foreach { info =>
+ listing.delete(info.getClass(), info.path)
+ }
+ }
+
+ /**
+ * Lease some space from the store. The leased space is calculated as a
fraction of the given
+ * event log size; this is an approximation, and doesn't mean the
application store cannot
+ * outgrow the lease.
+ *
+ * If there's not enough space for the lease, other applications might
be evicted to make room.
+ * This method always returns a lease, meaning that it's possible for
local disk usage to grow
+ * past the configured threshold if there aren't enough idle
applications to evict.
+ *
+ * While the lease is active, the data is written to a temporary
location, so `openStore()`
+ * will still return `None` for the application.
+ */
+ def lease(eventLogSize: Long): Lease = {
+ val needed = approximateSize(eventLogSize)
+ makeRoom(needed)
+
+ val perms = PosixFilePermissions.fromString("rwx------")
+ val tmp = Files.createTempDirectory(tmpStoreDir.toPath(), "appstore",
+ PosixFilePermissions.asFileAttribute(perms)).toFile()
+
+ updateUsage(needed)
+ new Lease(tmp, needed)
+ }
+
+ /**
+ * Returns whether there's enough free space to create a store for an
application event log.
+ * This uses an approximation of what's the expected size of an
application store given the
+ * size of the event log, since there's no way to really know that
relationship up front.
+ */
+ def hasFreeSpace(eventLogSize: Long): Boolean = {
+ approximateSize(eventLogSize) <= free()
+ }
+
+ /**
+ * Returns the location of an application store if it's still available.
Marks the store as
+ * being used so that it's not evicted when running out of designated
space.
+ */
+ def openStore(appId: String, attemptId: Option[String]): Option[File] = {
+ val storePath = active.synchronized {
+ val path = appStorePath(appId, attemptId)
+ if (path.isDirectory()) {
+ active(appId -> attemptId) = sizeOf(path)
+ Some(path)
+ } else {
+ None
+ }
+ }
+
+ storePath.foreach { path =>
+ updateAccessTime(appId, attemptId)
+ }
+
+ storePath
+ }
+
+ /**
+ * Tell the disk manager that the store for the given application is not
being used anymore.
+ *
+ * @param delete Whether to delete the store from disk.
+ */
+ def release(appId: String, attemptId: Option[String], delete: Boolean =
false): Unit = {
+ // Because LevelDB may modify the structure of the store files even
when just reading, update
+ // the accounting for this application when it's closed.
+ val oldSizeOpt = active.synchronized {
+ active.remove(appId -> attemptId)
+ }
+
+ oldSizeOpt.foreach { oldSize =>
+ val path = appStorePath(appId, attemptId)
+ updateUsage(-oldSize)
+ if (path.isDirectory()) {
+ if (delete) {
+ FileUtils.deleteDirectory(path)
+ listing.delete(classOf[ApplicationStoreInfo],
path.getAbsolutePath())
+ } else {
+ updateUsage(sizeOf(path))
+ }
+ }
+ }
+ }
+
+ /**
+ * A non-scientific approximation of how large an app state store will
be given the size of the
+ * event log. By default it's 30% of the event log size.
+ */
+ private def approximateSize(eventLogSize: Long): Long = {
+ math.ceil(eventLogSizeRatio * eventLogSize).toLong
--- End diff --
I'll probably need to look at a better heuristic here. This is probably not
a good approximation if the logs are compressed, and #20013 also affects this.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]