Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20011#discussion_r158569095 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.File +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermissions +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.commons.io.FileUtils + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.status.KVUtils._ +import org.apache.spark.util.{Clock, Utils} +import org.apache.spark.util.kvstore.KVStore + +/** + * A class used to keep track of disk usage by the SHS, allowing application data to be deleted + * from disk when usage exceeds a configurable threshold. + * + * The goal of the class is not to guarantee that usage will never exceed the threshold; because of + * how application data is written, disk usage may temporarily go higher. But, eventually, it + * should fall back under the threshold. + * + * @param conf Spark configuration. + * @param path Path where to store application data. + * @param listing The listing store, used to persist usage data. + * @param clock Clock instance to use. + */ +private class HistoryServerDiskManager( + conf: SparkConf, + path: File, + listing: KVStore, + clock: Clock) extends Logging { + + import config._ + + private val appStoreDir = new File(path, "apps") + if (!appStoreDir.isDirectory() && !appStoreDir.mkdir()) { + throw new IllegalArgumentException(s"Failed to create app directory ($appStoreDir).") + } + + private val tmpStoreDir = new File(path, "temp") + if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) { + throw new IllegalArgumentException(s"Failed to create temp directory ($tmpStoreDir).") + } + + private val maxUsage = conf.get(MAX_LOCAL_DISK_USAGE) + private val currentUsage = new AtomicLong(0L) + private val committedUsage = new AtomicLong(0L) + private val active = new HashMap[(String, Option[String]), Long]() + + def initialize(): Unit = { + updateUsage(sizeOf(appStoreDir), committed = true) + + // Clean up any temporary stores during start up. This assumes that they're leftover from other + // instances and are not useful. + tmpStoreDir.listFiles().foreach(FileUtils.deleteQuietly) + + // Go through the recorded store directories and remove any that may have been removed by + // external code. + val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info => + !new File(info.path).exists() + }.toSeq + + orphans.foreach { info => + listing.delete(info.getClass(), info.path) + } + } + + /** + * Lease some space from the store. The leased space is calculated as a fraction of the given + * event log size; this is an approximation, and doesn't mean the application store cannot + * outgrow the lease. + * + * If there's not enough space for the lease, other applications might be evicted to make room. + * This method always returns a lease, meaning that it's possible for local disk usage to grow + * past the configured threshold if there aren't enough idle applications to evict. + * + * While the lease is active, the data is written to a temporary location, so `openStore()` + * will still return `None` for the application. + */ + def lease(eventLogSize: Long, isCompressed: Boolean = false): Lease = { + val needed = approximateSize(eventLogSize, isCompressed) + makeRoom(needed) + + val perms = PosixFilePermissions.fromString("rwx------") + val tmp = Files.createTempDirectory(tmpStoreDir.toPath(), "appstore", + PosixFilePermissions.asFileAttribute(perms)).toFile() + + updateUsage(needed) + new Lease(tmp, needed) + } + + /** + * Returns the location of an application store if it's still available. Marks the store as + * being used so that it's not evicted when running out of designated space. + */ + def openStore(appId: String, attemptId: Option[String]): Option[File] = { + val storePath = active.synchronized { + val path = appStorePath(appId, attemptId) + if (path.isDirectory()) { + active(appId -> attemptId) = sizeOf(path) + Some(path) + } else { + None + } + } + + storePath.foreach { path => + updateAccessTime(appId, attemptId) + } + + storePath + } + + /** + * Tell the disk manager that the store for the given application is not being used anymore. + * + * @param delete Whether to delete the store from disk. + */ + def release(appId: String, attemptId: Option[String], delete: Boolean = false): Unit = { + // Because LevelDB may modify the structure of the store files even when just reading, update + // the accounting for this application when it's closed. + val oldSizeOpt = active.synchronized { + active.remove(appId -> attemptId) + } + + oldSizeOpt.foreach { oldSize => + val path = appStorePath(appId, attemptId) + updateUsage(-oldSize, committed = true) + if (path.isDirectory()) { + if (delete) { + deleteStore(path) + } else { + val newSize = sizeOf(path) + val newInfo = listing.read(classOf[ApplicationStoreInfo], path.getAbsolutePath()) + .copy(size = newSize) + listing.write(newInfo) + updateUsage(newSize, committed = true) + } + } + } + } + + /** + * A non-scientific approximation of how large an app state store will be given the size of the + * event log. + */ + def approximateSize(eventLogSize: Long, isCompressed: Boolean): Long = { + if (isCompressed) { + // For compressed logs, assume that compression reduces the log size a lot, and the disk + // store will actually grow compared to the log size. + eventLogSize * 2 + } else { + // For non-compressed logs, assume the disk store will end up at approximately 50% of the + // size of the logs. This is loosely based on empirical evidence. + eventLogSize / 2 + } + } + + /** Current free space. Considers space currently leased out too. */ + def free(): Long = { + math.max(maxUsage - currentUsage.get(), 0L) + } + + /** Current committed space. */ + def committed(): Long = committedUsage.get() + + private def deleteStore(path: File): Unit = { + FileUtils.deleteDirectory(path) + listing.delete(classOf[ApplicationStoreInfo], path.getAbsolutePath()) + } + + private def makeRoom(size: Long): Unit = { + if (free() < size) { + logDebug(s"Not enough free space, looking at candidates for deletion...") + val evicted = new ListBuffer[ApplicationStoreInfo]() + Utils.tryWithResource( + listing.view(classOf[ApplicationStoreInfo]).index("lastAccess").closeableIterator() + ) { iter => + var needed = size + while (needed > 0 && iter.hasNext()) { + val info = iter.next() + val isActive = active.synchronized { + active.contains(info.appId -> info.attemptId) + } + if (!isActive) { + evicted += info + needed -= info.size + } + } + } + + evicted.foreach { info => + logInfo(s"Deleting store for ${info.appId}/${info.attemptId}.") + deleteStore(new File(info.path)) + updateUsage(-info.size, committed = true) + } --- End diff -- Yeah, I'll add some more logs.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org