vanzin closed pull request #10225: [SPARK-12196][Core] Store/retrieve blocks from different speed storage devices by hierarchical way URL: https://github.com/apache/spark/pull/10225
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 4a15559e55cbd..171017546d847 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException { assert (partitionWriters == null); if (!records.hasNext()) { partitionLengths = new long[numPartitions]; - shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null, null); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); return; } @@ -162,7 +162,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException { File tmp = Utils.tempFileWith(output); try { partitionLengths = writePartitionedFile(tmp); - shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp, output); } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 8a1771848dee6..131c261b46e32 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -221,7 +221,7 @@ void closeAndWriteOutput() throws IOException { } } } - shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp, output); } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 91858f0912b65..3f75a7fa9b3c0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -137,7 +137,11 @@ private[spark] class IndexShuffleBlockResolver( shuffleId: Int, mapId: Int, lengths: Array[Long], - dataTmp: File): Unit = { + // SPARK-12196, when using tiered store, + // multiple call of getDataFile could return different value, + // so we pass both dataFileTmp and dataFile for renaming. + dataFileTmp: File, + dataFile: File): Unit = { val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) try { @@ -154,17 +158,17 @@ private[spark] class IndexShuffleBlockResolver( out.close() } - val dataFile = getDataFile(shuffleId, mapId) // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure // the following check and rename are atomic. synchronized { - val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) + val existingLengths = checkIndexAndDataFile(getIndexFile(shuffleId, mapId), + getDataFile(shuffleId, mapId), lengths.length) if (existingLengths != null) { // Another attempt for the same task has already written our map outputs successfully, // so just use the existing partition lengths and delete our temporary map outputs. System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) - if (dataTmp != null && dataTmp.exists()) { - dataTmp.delete() + if (dataFileTmp != null && dataFileTmp.exists()) { + dataFileTmp.delete() } indexTmp.delete() } else { @@ -173,14 +177,15 @@ private[spark] class IndexShuffleBlockResolver( if (indexFile.exists()) { indexFile.delete() } - if (dataFile.exists()) { + if (dataFile != null && dataFile.exists()) { dataFile.delete() } if (!indexTmp.renameTo(indexFile)) { throw new IOException("fail to rename file " + indexTmp + " to " + indexFile) } - if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { - throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) + if (dataFile != null && dataFileTmp != null + && dataFileTmp.exists() && !dataFileTmp.renameTo(dataFile)) { + throw new IOException("fail to rename file " + dataFileTmp + " to " + dataFile) } } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 636b88e792bf3..1acf8d5e840fe 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -70,7 +70,8 @@ private[spark] class SortShuffleWriter[K, V, C]( try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) - shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) + shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, + partitionLengths, tmp, output) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 3d43e3c367aac..ce7555cf6e68e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -44,38 +44,41 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } - // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content - // of subDirs(i) is protected by the lock of subDirs(i) - private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private val shutdownHook = addShutdownHook() - /** Looks up a file by hashing it into one of our local subdirectories. */ - // This method should be kept in sync with - // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). - def getFile(filename: String): File = { - // Figure out which local directory it hashes to, and which subdirectory in that - val hash = Utils.nonNegativeHash(filename) - val dirId = hash % localDirs.length - val subDirId = (hash / localDirs.length) % subDirsPerLocalDir - - // Create the subdirectory if it doesn't already exist - val subDir = subDirs(dirId).synchronized { - val old = subDirs(dirId)(subDirId) - if (old != null) { - old - } else { - val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) - if (!newDir.exists() && !newDir.mkdir()) { - throw new IOException(s"Failed to create local dir in $newDir.") - } - subDirs(dirId)(subDirId) = newDir - newDir - } + private val shortFileAllocatorNames = Map( + "hash" -> classOf[HashAllocator].getName, + "tiered" -> classOf[TieredAllocator].getName) + private def createFileAllcator(allocatorName: String): FileAllocator = { + val allocatorClass = shortFileAllocatorNames + .getOrElse(allocatorName.toLowerCase, allocatorName) + val allocator = try { + val clazz = Utils.classForName(allocatorClass) + val ctor = clazz.getConstructor(classOf[SparkConf], + classOf[Array[File]], java.lang.Integer.TYPE) + Some(ctor.newInstance(conf, localDirs, new Integer(subDirsPerLocalDir)) + .asInstanceOf[FileAllocator]) + } catch { + case _: ClassNotFoundException => + logWarning(s"Could not find class for fileAllocator($allocatorClass).") + None + case _: Exception => + logWarning(s"Fail to initialize fileAllocator($allocatorClass).") + None + } + if (allocator.isDefined && allocator.get.support) { + logInfo(s"fileAllocator($allocatorClass) is enabled.") + allocator.get + } else { + logWarning("Default fileAllocator(HashAllocator) is enabled instead.") + new HashAllocator(conf, localDirs, subDirsPerLocalDir) } - - new File(subDir, filename) } + private val allocatorName = conf.get("spark.diskStore.allocator", "hash") + private val fileAllocator: FileAllocator = createFileAllcator(allocatorName) + + def getFile(filename: String): File = fileAllocator(filename) def getFile(blockId: BlockId): File = getFile(blockId.name) @@ -87,7 +90,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** List all the files currently stored on disk by the disk manager. */ def getAllFiles(): Seq[File] = { // Get all the files inside the array of array of directories - subDirs.flatMap { dir => + fileAllocator.subDirs.flatMap { dir => dir.synchronized { // Copy the content of dir because it may be modified in other threads dir.clone() diff --git a/core/src/main/scala/org/apache/spark/storage/FileAllocator.scala b/core/src/main/scala/org/apache/spark/storage/FileAllocator.scala new file mode 100644 index 0000000000000..124f5b9da5eff --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/FileAllocator.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.{File, IOException} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[storage] abstract class FileAllocator( + conf: SparkConf, + localDirs: Array[File], + subDirsPerLocalDir: Int) + extends Logging { + // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content + // of subDirs(i) is protected by the lock of subDirs(i) + val subDirs: Array[Array[File]] = + Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + + def support: Boolean = true + + def apply(filename: String): File + + protected def getFile(filename: String, storageDirs: Array[File]): File = { + require(storageDirs.nonEmpty, "could not find file when the directories are empty") + + // Figure out which local directory it hashes to, and which subdirectory in that + val hash = Utils.nonNegativeHash(filename) + val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length)) + val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir + + // Create the subdirectory if it doesn't already exist + val subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) + if (!newDir.exists() && !newDir.mkdir()) { + throw new IOException(s"Failed to create local dir in $newDir.") + } + subDirs(dirId)(subDirId) = newDir + newDir + } + } + + new File(subDir, filename) + } +} + +/** Looks up a file by hashing it into one of our local subdirectories. */ +// This method should be kept in sync with +// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). +private[storage] class HashAllocator( + conf: SparkConf, + localDirs: Array[File], + subDirsPerLocalDir: Int) + extends FileAllocator( + conf, + localDirs, + subDirsPerLocalDir) { + def apply(filename: String): File = getFile(filename, localDirs) +} + +/** Looks up a file by tier way in different speed storage devices. */ +private[storage] class TieredAllocator( + conf: SparkConf, + localDirs: Array[File], + subDirsPerLocalDir: Int) + extends FileAllocator( + conf, + localDirs, + subDirsPerLocalDir) { + private val tiersEnvConf = conf.getenv("SPARK_DIRS_TIERS") + private val threshold = conf.getDouble("spark.diskStore.tiered.threshold", 0.15) + + if (tiersEnvConf == null) { + logAndThrowException("SPARK_DIRS_TIERS is not set.") + } + private val tiersIDs = tiersEnvConf.trim.split("") + + if(localDirs.length != tiersIDs.length) { + logAndThrowException(s"Incorrect SPARK_DIRS_TIERS setting, " + + s"SPARK_DIRS_TIERS = '$tiersEnvConf'.") + } + private val tieredDirs: Seq[Array[File]] = (localDirs zip tiersIDs). + groupBy(_._2).mapValues(_.map(_._1)).toSeq.sortBy(_._1).map(_._2) + tieredDirs.zipWithIndex.foreach { + case (dirs, index) => + logInfo(s"Tier $index:") + dirs.foreach(d => logInfo(s" $d")) + } + + override def support: Boolean = { + // TODO: tiered allocation for external shuffle service will be supported in another PR + !conf.getBoolean("spark.shuffle.service.enabled", defaultValue = false) + } + + def hasEnoughSpace(file: File): Boolean = { + file.getParentFile.getFreeSpace * 1.0 / file.getParentFile.getTotalSpace >= threshold + } + + def apply(filename: String): File = { + var file: File = null + var availableFile: File = null + for (tier <- tieredDirs) { + file = getFile(filename, tier) + if (file.exists()) return file + + if (availableFile == null && hasEnoughSpace(file)) { + availableFile = file + } + } + Option(availableFile).getOrElse(file) + } + + private def logAndThrowException(msg: String) = { + logWarning(msg) + throw new IllegalArgumentException(msg) + } +} diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 088b68132d905..9a7ac77d485c9 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -141,12 +141,14 @@ public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Th public Void answer(InvocationOnMock invocationOnMock) throws Throwable { partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; File tmp = (File) invocationOnMock.getArguments()[3]; - mergedOutputFile.delete(); - tmp.renameTo(mergedOutputFile); + File dataFile = (File) invocationOnMock.getArguments()[4]; + dataFile.delete(); + tmp.renameTo(dataFile); return null; } }).when(shuffleBlockResolver) - .writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), any(File.class)); + .writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), + any(File.class), any(File.class)); when(diskBlockManager.createTempShuffleBlock()).thenAnswer( new Answer<Tuple2<TempShuffleBlockId, File>>() { diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 85ccb33471048..e2c5fdbca3eae 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -72,14 +72,16 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte doAnswer(new Answer[Void] { def answer(invocationOnMock: InvocationOnMock): Void = { val tmp: File = invocationOnMock.getArguments()(3).asInstanceOf[File] - if (tmp != null) { - outputFile.delete - tmp.renameTo(outputFile) + val dataFile: File = invocationOnMock.getArguments()(4).asInstanceOf[File] + if (tmp != null && dataFile != null) { + dataFile.delete + tmp.renameTo(dataFile) } null } }).when(blockResolver) - .writeIndexFileAndCommit(anyInt, anyInt, any(classOf[Array[Long]]), any(classOf[File])) + .writeIndexFileAndCommit(anyInt, anyInt, any(classOf[Array[Long]]), + any(classOf[File]), any(classOf[File])) when(blockManager.diskBlockManager).thenReturn(diskBlockManager) when(blockManager.getDiskWriter( any[BlockId], diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index d21ce73f4021e..36660ffbeb13d 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -65,15 +65,19 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa test("commit shuffle files multiple times") { val resolver = new IndexShuffleBlockResolver(conf, blockManager) + val shuffleId = 1 + val mapId = 2 + val lengths = Array[Long](10, 0, 20) - val dataTmp = File.createTempFile("shuffle", null, tempDir) + val data = resolver.getDataFile(shuffleId, mapId) + val dataTmp = Utils.tempFileWith(data) val out = new FileOutputStream(dataTmp) Utils.tryWithSafeFinally { out.write(new Array[Byte](30)) } { out.close() } - resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp) + resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp, data) val dataFile = resolver.getDataFile(1, 2) assert(dataFile.exists()) @@ -81,7 +85,8 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa assert(!dataTmp.exists()) val lengths2 = new Array[Long](3) - val dataTmp2 = File.createTempFile("shuffle", null, tempDir) + val data2 = resolver.getDataFile(shuffleId, mapId) + val dataTmp2 = Utils.tempFileWith(data2) val out2 = new FileOutputStream(dataTmp2) Utils.tryWithSafeFinally { out2.write(Array[Byte](1)) @@ -89,7 +94,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } { out2.close() } - resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2) + resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2, data2) assert(lengths2.toSeq === lengths.toSeq) assert(dataFile.exists()) assert(dataFile.length() === 30) @@ -109,7 +114,8 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa dataFile.delete() val lengths3 = Array[Long](10, 10, 15) - val dataTmp3 = File.createTempFile("shuffle", null, tempDir) + val data3 = resolver.getDataFile(shuffleId, mapId) + val dataTmp3 = Utils.tempFileWith(data3) val out3 = new FileOutputStream(dataTmp3) Utils.tryWithSafeFinally { out3.write(Array[Byte](2)) @@ -117,7 +123,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } { out3.close() } - resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3) + resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3, data3) assert(lengths3.toSeq != lengths.toSeq) assert(dataFile.exists()) assert(dataFile.length() === 35) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerTieredSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerTieredSuite.scala new file mode 100644 index 0000000000000..535acc8cb11d4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerTieredSuite.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.{File, FileWriter} + +import scala.collection.mutable +import scala.language.reflectiveCalls + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.util.{SparkConfWithEnv, Utils} + +object DiskBlockManagerTieredSuite { + val SPARK_DIRS_TIERS_CONF = "000011112222" + case class mockDiskAndTier(diskSpaceStatus: Array[Boolean], expectedTiers: Array[Int]) + val mockTestsSeq = Array( + mockDiskAndTier(Array(true, true, true), Array(0, 0, 0)), + mockDiskAndTier(Array(true, true, false, true, false, false, true, false, false, false), + Array(0, 0, 1, 2, 2)), + mockDiskAndTier(Array(false, false, false, false, false, false, false, false, false), + Array(2, 2, 2)), + mockDiskAndTier(Array(true, false, true, false, false, true, + false, false, true, true, false, true, true, false, false, false), + Array(0, 1, 2, 2, 0, 1, 0, 2)) + ) + + // this class override hasEnoughSpace in the TieredAllocator to mock disk space change + class MockTieredAllocator( + conf: SparkConf, + localDirs: Array[File], + subDirsPerLocalDir: Int) + extends TieredAllocator( + conf, + localDirs, + subDirsPerLocalDir) { + private val caseNum = conf.getInt("spark.testTieredStorage.caseNum", 0) + private val diskStatus = mockTestsSeq(caseNum).diskSpaceStatus + + private var index = -1 + override def hasEnoughSpace(file: File): Boolean = { + index += 1 + if (index < diskStatus.length) { + diskStatus(index) + } + else { + true + } + } + } +} + +class DiskBlockManagerTieredSuite + extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll { + import org.apache.spark.storage.DiskBlockManagerTieredSuite._ + + private val testConf = new SparkConfWithEnv(Map("SPARK_DIRS_TIERS" -> SPARK_DIRS_TIERS_CONF)) + private var rootDirs: Seq[File] = _ + + var diskBlockManager: DiskBlockManager = _ + + override def beforeAll() { + super.beforeAll() + rootDirs = (0 until SPARK_DIRS_TIERS_CONF.length).map(_ => Utils.createTempDir()) + testConf.set("spark.local.dir", rootDirs.map(_.getAbsolutePath).mkString(",")) + } + + override def afterAll() { + try { + rootDirs.foreach(Utils.deleteRecursively) + } finally { + super.afterAll() + } + } + + override def afterEach() { + try { + diskBlockManager.stop() + } finally { + super.afterEach() + } + } + + def testTieredStorage(caseNum: Int): Unit = { + testConf.set("spark.diskStore.allocator", classOf[MockTieredAllocator].getName) + .set("spark.testTieredStorage.caseNum", caseNum.toString) + diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true) + + val createdBlocks = mutable.ArrayBuffer[TestBlockId]() + val rootDirsPath = rootDirs.map(_.getCanonicalPath) + val tierSeq = mockTestsSeq(caseNum).expectedTiers + val tiersConf = SPARK_DIRS_TIERS_CONF + assert(!tierSeq.exists(s => s < 0 || s > tiersConf(tiersConf.length - 1).toInt)) + tierSeq.zipWithIndex.foreach { + case (i, idx) => + val blockId = TestBlockId("test" + idx) + createdBlocks += blockId + val file = diskBlockManager.getFile(blockId) + writeToFile(file, 10) + val path = rootDirsPath.find(file.getCanonicalPath.startsWith(_)) + val j = rootDirsPath.indexOf(path.get) / 4 + assert(i == j) + } + + createdBlocks.foreach { + b => assert(diskBlockManager.containsBlock(b)) + } + } + + test("basic block creation with hash allocator") { + testConf.set("spark.diskStore.allocator", "hash") + diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true) + val blockId = TestBlockId("test") + val newFile = diskBlockManager.getFile(blockId) + writeToFile(newFile, 10) + assert(diskBlockManager.containsBlock(blockId)) + newFile.delete() + assert(!diskBlockManager.containsBlock(blockId)) + } + + test("basic block creation with tiered allocator") { + testConf.set("spark.diskStore.allocator", "tiered") + diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true) + // repeat 20 times and make sure the blocks are only allocated in tie 0 + (0 until 100).foreach { + i => + val blockId = TestBlockId("test" + i) + val file = diskBlockManager.getFile(blockId) + val rootDirsPath = rootDirs.map(_.getCanonicalPath) + val path = rootDirsPath.find(file.getCanonicalPath.startsWith(_)) + val j = rootDirsPath.indexOf(path.get) / 4 + assert(j == 0) + writeToFile(file, 10) + assert(diskBlockManager.containsBlock(blockId)) + file.delete() + assert(!diskBlockManager.containsBlock(blockId)) + } + } + + test("tiered storage: enough disk space, all in tier 0") { + testTieredStorage(0) + } + + test("tiered storage: in all ties including 0, 1, 2...") { + testTieredStorage(1) + } + + test("tiered storage: no disk space, all in the last tier") { + testTieredStorage(2) + } + + test("tiered storage: disks space keep changing") { + testTieredStorage(3) + } + + def writeToFile(file: File, numBytes: Int) { + val writer = new FileWriter(file, true) + for (i <- 0 until numBytes) writer.write(i) + writer.close() + } +} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 868c2edc5a463..8e3bc417751ed 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -240,6 +240,10 @@ private[yarn] class ExecutorRunnable( YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) } + // Tiered storage config + val tiersConf = conf.get("yarn.nodemanager.spark-dirs-tiers").trim + env("SPARK_DIRS_TIERS") = tiersConf + // lookup appropriate http scheme for container log urls val yarnHttpPolicy = conf.get( YarnConfiguration.YARN_HTTP_POLICY_KEY, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org