attilapiros commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r434546046
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -148,6 +170,86 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ /**
+ * Write a provided shuffle block as a stream. Used for block migrations.
+ * ShuffleBlockBatchIds must contain the full range represented in the
ShuffleIndexBlock.
+ * Requires the caller to delete any shuffle index blocks where the shuffle
block fails to
+ * put.
+ */
+ override def putShuffleBlockAsStream(blockId: BlockId, serializerManager:
SerializerManager):
+ StreamCallbackWithID = {
+ val file = blockId match {
+ case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ getIndexFile(shuffleId, mapId)
Review comment:
Nit: indent (there is an extra space):
```suggestion
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
getIndexFile(shuffleId, mapId)
```
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -148,6 +170,86 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ /**
+ * Write a provided shuffle block as a stream. Used for block migrations.
+ * ShuffleBlockBatchIds must contain the full range represented in the
ShuffleIndexBlock.
+ * Requires the caller to delete any shuffle index blocks where the shuffle
block fails to
+ * put.
+ */
+ override def putShuffleBlockAsStream(blockId: BlockId, serializerManager:
SerializerManager):
+ StreamCallbackWithID = {
+ val file = blockId match {
+ case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ getIndexFile(shuffleId, mapId)
+ case ShuffleDataBlockId(shuffleId, mapId, _) =>
+ getDataFile(shuffleId, mapId)
+ case _ =>
+ throw new Exception(s"Unexpected shuffle block transfer ${blockId} as
" +
+ s"${blockId.getClass().getSimpleName()}")
+ }
+ val fileTmp = Utils.tempFileWith(file)
+ val channel = Channels.newChannel(
+ serializerManager.wrapStream(blockId,
+ new FileOutputStream(fileTmp)))
+
+ new StreamCallbackWithID {
+
+ override def getID: String = blockId.name
+
+ override def onData(streamId: String, buf: ByteBuffer): Unit = {
+ while (buf.hasRemaining) {
+ channel.write(buf)
+ }
+ }
+
+ override def onComplete(streamId: String): Unit = {
+ logTrace(s"Done receiving block $blockId, now putting into local
shuffle service")
+ channel.close()
+ val diskSize = fileTmp.length()
+ this.synchronized {
+ if (file.exists()) {
+ file.delete()
+ }
+ if (!fileTmp.renameTo(file)) {
+ throw new IOException(s"fail to rename file ${fileTmp} to ${file}")
+ }
+ }
+ blockManager.reportBlockStatus(blockId, BlockStatus(
+ StorageLevel(
+ useDisk = true,
+ useMemory = false,
+ useOffHeap = false,
+ deserialized = false,
+ replication = 0)
+ , 0, diskSize))
+ }
+
+ override def onFailure(streamId: String, cause: Throwable): Unit = {
+ // the framework handles the connection itself, we just need to do
local cleanup
Review comment:
The reason of the failure might be interesting here:
```suggestion
// the framework handles the connection itself, we just need to do
local cleanup
logWarning(s"Error while uploading $blockId", cause)
```
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -55,6 +58,25 @@ private[spark] class IndexShuffleBlockResolver(
def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId,
mapId, None)
+ /**
+ * Get the shuffle files that are stored locally. Used for block migrations.
+ */
+ override def getStoredShuffles(): Set[(Int, Long)] = {
+ // Matches ShuffleIndexBlockId name
+ val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r
+ val rootDirs = blockManager.diskBlockManager.localDirs
+ // ExecutorDiskUtil puts things inside one level hashed sub directories
+ val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory())
++ rootDirs
+ val filenames = searchDirs.flatMap(_.list())
+ logDebug(s"Got block files ${filenames.toList}")
+ filenames.flatMap{ fname =>
Review comment:
Nit:
```suggestion
filenames.flatMap { fname =>
```
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1790,6 +1813,107 @@ private[spark] class BlockManager(
}
}
+
+ // Shuffles which are either in queue for migrations or migrated
Review comment:
Nit: move the vals to the beginning of the class.
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -489,6 +492,23 @@ class BlockManagerMasterEndpoint(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
+ logInfo(s"Updating block info on master ${blockId} for ${blockManagerId}")
+
+ if (blockId.isInternalShuffle) {
+ blockId match {
+ case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ // Don't update the map output on just the index block
+ logDebug("Received shuffle index block update for ${shuffleId}
${mapId}")
+ return true
+ case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) =>
+ logInfo("Received shuffle data block update for ${shuffleId}
${mapId}, performing update")
+ mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
+ return true
+ case _ =>
+ logDebug(s"Unexpected shuffle block type ${blockId}")
Review comment:
```suggestion
logDebug(s"Unexpected shuffle block type ${blockId}" +
s"as ${blockId.getClass().getSimpleName()}")
```
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
##########
@@ -22,85 +22,226 @@ import java.util.concurrent.Semaphore
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite, Success}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark._
import org.apache.spark.internal.config
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd,
SparkListenerTaskStart}
+import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.util.{ResetSystemProperties, ThreadUtils}
class BlockManagerDecommissionSuite extends SparkFunSuite with
LocalSparkContext
- with ResetSystemProperties {
+ with ResetSystemProperties with Eventually {
+
+ val numExecs = 3
+ val numParts = 3
+
+ test(s"verify that an already running task which is going to cache data
succeeds " +
+ s"on a decommissioned executor") {
+ runDecomTest(true, false, true)
+ }
- override def beforeEach(): Unit = {
- val conf = new SparkConf().setAppName("test")
+ test(s"verify that shuffle blocks are migrated with force to disk") {
+ runDecomTest(false, true, false, remoteBlockSize = "1")
+ }
+
+ test(s"verify that shuffle blocks are migrated") {
+ runDecomTest(false, true, false)
+ }
+
+ test(s"verify that both migrations can work at the same time.") {
+ runDecomTest(true, true, false)
+ }
+
+ private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring:
Boolean,
+ remoteBlockSize: String = "100000") = {
+
+ val master = s"local-cluster[${numExecs}, 1, 1024]"
+ val conf = new SparkConf().setAppName("test").setMaster(master)
.set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
+ .set(config.STORAGE_RDD_DECOMMISSION_ENABLED, persist)
+ .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle)
+ // Just replicate blocks as fast as we can during testing, there isn't
another
+ // workload we need to worry about.
+ .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
- sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
- }
+ // Allow force fetching to local disk
+ conf.set("spark.network.maxRemoteBlockSizeFetchToMem", remoteBlockSize)
+
+ sc = new SparkContext(master, "test", conf)
- test(s"verify that an already running task which is going to cache data
succeeds " +
- s"on a decommissioned executor") {
// Create input RDD with 10 partitions
- val input = sc.parallelize(1 to 10, 10)
+ val input = sc.parallelize(1 to numParts, numParts)
val accum = sc.longAccumulator("mapperRunAccumulator")
// Do a count to wait for the executors to be registered.
input.count()
// Create a new RDD where we have sleep in each partition, we are also
increasing
// the value of accumulator in each partition
- val sleepyRdd = input.mapPartitions { x =>
- Thread.sleep(500)
+ val baseRdd = input.mapPartitions { x =>
+ if (migrateDuring) {
+ Thread.sleep(500)
+ }
accum.add(1)
- x
+ x.map(y => (y, y))
+ }
+ val testRdd = shuffle match {
+ case true => baseRdd.reduceByKey(_ + _)
+ case false => baseRdd
}
- // Listen for the job
- val sem = new Semaphore(0)
+ // Listen for the job & block updates
+ val taskStartSem = new Semaphore(0)
+ val broadcastSem = new Semaphore(0)
+ val executorRemovedSem = new Semaphore(0)
val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+ val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
sc.addSparkListener(new SparkListener {
+
+ override def onExecutorRemoved(execRemoved:
SparkListenerExecutorRemoved): Unit = {
+ executorRemovedSem.release()
+ }
+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
- sem.release()
+ taskStartSem.release()
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEndEvents.append(taskEnd)
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated):
Unit = {
+ // Once broadcast start landing on the executors we're good to proceed.
+ // We don't only use task start as it can occur before the work is on
the executor.
+ if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
+ broadcastSem.release()
+ }
+ blocksUpdated.append(blockUpdated)
+ }
})
+
// Cache the RDD lazily
- sleepyRdd.persist()
+ if (persist) {
+ testRdd.persist()
+ }
- // Start the computation of RDD - this step will also cache the RDD
- val asyncCount = sleepyRdd.countAsync()
+ // Wait for the first executor to start
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ numExecutors = 1,
+ timeout = 20000) // 20s
- // Wait for the job to have started
- sem.acquire(1)
+ // Start the computation of RDD - this step will also cache the RDD
+ val asyncCount = testRdd.countAsync()
+
+ // Wait for all of the executors to start
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ // We need to make sure there is the original plus one exec to migrate
too, we don't need
+ // the full set.
+ numExecutors = 2,
+ timeout = 30000) // 30s
+
+ // Wait for the job to have started.
+ taskStartSem.acquire(1)
+ // Wait for each executor + driver to have it's broadcast info delivered.
+ broadcastSem.acquire((numExecs + 1))
+
+ // Make sure the job is either mid run or otherwise has data to migrate.
+ if (migrateDuring) {
+ // Give Spark a tiny bit to start executing after the broadcast blocks
land.
+ // For me this works at 100, set to 300 for system variance.
+ Thread.sleep(300)
+ } else {
+ ThreadUtils.awaitResult(asyncCount, 15.seconds)
+ }
- // Give Spark a tiny bit to start the tasks after the listener says hello
- Thread.sleep(100)
// Decommission one of the executor
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
- assert(execs.size == 2, s"Expected 2 executors but found ${execs.size}")
+ assert(execs.size == numExecs, s"Expected ${numExecs} executors but found
${execs.size}")
+
val execToDecommission = execs.head
+ logDebug(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(execToDecommission)
// Wait for job to finish
- val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds)
- assert(asyncCountResult === 10)
- // All 10 tasks finished, so accum should have been increased 10 times
- assert(accum.value === 10)
+ val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
+ assert(asyncCountResult === numParts)
+ // All tasks finished, so accum should have been increased numParts times
+ assert(accum.value === numParts)
// All tasks should be successful, nothing should have failed
sc.listenerBus.waitUntilEmpty()
- assert(taskEndEvents.size === 10) // 10 mappers
- assert(taskEndEvents.map(_.reason).toSet === Set(Success))
+ if (shuffle) {
+ // mappers & reducers which succeeded
+ assert(taskEndEvents.count(_.reason == Success) === 2 * numParts,
+ s"Expected ${2 * numParts} tasks got ${taskEndEvents.size}
(${taskEndEvents})")
+ } else {
+ // only mappers which executed successfully
+ assert(taskEndEvents.count(_.reason == Success) === numParts,
+ s"Expected ${numParts} tasks got ${taskEndEvents.size}
(${taskEndEvents})")
+ }
- // Since the RDD is cached, so further usage of same RDD should use the
+ // Wait for our respective blocks to have migrated
+ eventually(timeout(15.seconds), interval(10.milliseconds)) {
+ if (persist) {
+ // One of our blocks should have moved.
+ val rddUpdates = blocksUpdated.filter{update =>
+ val blockId = update.blockUpdatedInfo.blockId
+ blockId.isRDD}
+ val blockLocs = rddUpdates.map{ update =>
+ (update.blockUpdatedInfo.blockId.name,
+ update.blockUpdatedInfo.blockManagerId)}
+ val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
+ assert(!blocksToManagers.filter(_._2 > 1).isEmpty,
+ s"We should have a block that has been on multiple BMs in rdds:\n
${rddUpdates} from:\n" +
+ s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
+ }
+ // If we're migrating shuffles we look for any shuffle block updates
+ // as there is no block update on the initial shuffle block write.
+ if (shuffle) {
+ val numDataLocs = blocksUpdated.filter{ update =>
+ val blockId = update.blockUpdatedInfo.blockId
+ blockId.isInstanceOf[ShuffleDataBlockId]
+ }.size
+ val numIndexLocs = blocksUpdated.filter{ update =>
Review comment:
```suggestion
val numIndexLocs = blocksUpdated.filter { update =>
```
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -650,6 +657,19 @@ private[spark] class BlockManager(
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[_]): StreamCallbackWithID = {
+ // Delegate shuffle blocks here to resolver if supported
+ if (blockId.isShuffle || blockId.isInternalShuffle) {
Review comment:
Is not `isInternalShuffle` sufficient here?
```suggestion
if (blockId.isInternalShuffle) {
```
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1790,6 +1813,107 @@ private[spark] class BlockManager(
}
}
+
+ // Shuffles which are either in queue for migrations or migrated
+ private val migratingShuffles = mutable.HashSet[(Int, Long)]()
+ // Shuffles which are queued for migration
+ private val shufflesToMigrate = new
java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]()
+
+
+ private class ShuffleMigrationRunnable(peer: BlockManagerId) extends
Runnable {
+ @volatile var running = true
+ override def run(): Unit = {
+ var migrating: Option[(Int, Long)] = None
+ val storageLevel = StorageLevel(
+ useDisk = true,
+ useMemory = false,
+ useOffHeap = false,
+ deserialized = false,
+ replication = 1)
+ logInfo(s"Starting migration thread for ${peer}")
+ // Once a block fails to transfer to an executor stop trying to transfer
more blocks
+ try {
+ while (running) {
+ val migrating = Option(shufflesToMigrate.poll())
+ migrating match {
+ case None =>
+ logInfo("Nothing to migrate")
+ // Nothing to do right now, but maybe a transfer will fail or a
new block
+ // will finish being committed.
+ val SLEEP_TIME_SECS = 1
+ Thread.sleep(SLEEP_TIME_SECS * 1000L)
+ case Some((shuffleId, mapId)) =>
+ logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to
${peer}")
+ val blocks =
+ migratableResolver.getMigrationBlocks(shuffleId, mapId)
+ logInfo(s"Got migration sub-blocks ${blocks}")
+ blocks.foreach { case (blockId, buffer) =>
+ logInfo(s"Migrating sub-block ${blockId}")
+ blockTransferService.uploadBlockSync(
+ peer.host,
+ peer.port,
+ peer.executorId,
+ blockId,
+ buffer,
+ storageLevel,
+ null)// class tag, we don't need for shuffle
+ logInfo(s"Migrated sub block ${blockId}")
+ }
+ logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")
+ }
+ }
+ // This catch is intentionally outside of the while running block.
+ // if we encounter errors migrating to an executor we want to stop.
+ } catch {
+ case e: Exception =>
+ migrating match {
+ case Some(shuffleMap) =>
+ logError("Error ${e} during migration, adding ${shuffleMap} back
to migration queue")
+ shufflesToMigrate.add(shuffleMap)
+ case None =>
+ logError("Error ${e} while waiting for block to migrate")
+ }
+ }
+ }
+ }
+
+ private val migrationPeers = mutable.HashMap[BlockManagerId,
ShuffleMigrationRunnable]()
+
+ /**
+ * Tries to offload all shuffle blocks that are registered with the shuffle
service locally.
+ * Note: this does not delete the shuffle files in-case there is an
in-progress fetch
+ * but rather shadows them.
+ * Requires an Indexed based shuffle resolver.
+ */
+ def offloadShuffleBlocks(): Unit = {
+ // Update the queue of shuffles to be migrated
+ logInfo("Offloading shuffle blocks")
+ val localShuffles = migratableResolver.getStoredShuffles()
+ logInfo(s"My local shuffles are ${localShuffles.toList}")
+ val newShufflesToMigrate = localShuffles.&~(migratingShuffles).toSeq
+ logInfo(s"My new shuffles to migrate ${newShufflesToMigrate.toList}")
+ shufflesToMigrate.addAll(newShufflesToMigrate.asJava)
+ migratingShuffles ++= newShufflesToMigrate
+
+ // Update the threads doing migrations
+ // TODO: Sort & only start as many threads as min(||blocks||, ||targets||)
using location pref
+ val livePeerSet = getPeers(false).toSet
+ val currentPeerSet = migrationPeers.keys.toSet
+ val deadPeers = currentPeerSet.&~(livePeerSet)
+ val newPeers = livePeerSet.&~(currentPeerSet)
+ migrationPeers ++= newPeers.map{peer =>
Review comment:
Nit:
```suggestion
migrationPeers ++= newPeers.map { peer =>
```
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -148,6 +170,86 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ /**
+ * Write a provided shuffle block as a stream. Used for block migrations.
+ * ShuffleBlockBatchIds must contain the full range represented in the
ShuffleIndexBlock.
+ * Requires the caller to delete any shuffle index blocks where the shuffle
block fails to
+ * put.
+ */
+ override def putShuffleBlockAsStream(blockId: BlockId, serializerManager:
SerializerManager):
+ StreamCallbackWithID = {
+ val file = blockId match {
+ case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ getIndexFile(shuffleId, mapId)
+ case ShuffleDataBlockId(shuffleId, mapId, _) =>
+ getDataFile(shuffleId, mapId)
+ case _ =>
+ throw new Exception(s"Unexpected shuffle block transfer ${blockId} as
" +
+ s"${blockId.getClass().getSimpleName()}")
+ }
+ val fileTmp = Utils.tempFileWith(file)
+ val channel = Channels.newChannel(
+ serializerManager.wrapStream(blockId,
+ new FileOutputStream(fileTmp)))
+
+ new StreamCallbackWithID {
+
+ override def getID: String = blockId.name
+
+ override def onData(streamId: String, buf: ByteBuffer): Unit = {
+ while (buf.hasRemaining) {
+ channel.write(buf)
+ }
+ }
+
+ override def onComplete(streamId: String): Unit = {
+ logTrace(s"Done receiving block $blockId, now putting into local
shuffle service")
Review comment:
Nit: I would avoid to use the expression "shuffle service" what about
"putting into local disk store of the block manager" or just "putting into the
local disk store"?
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1790,6 +1813,107 @@ private[spark] class BlockManager(
}
}
+
+ // Shuffles which are either in queue for migrations or migrated
+ private val migratingShuffles = mutable.HashSet[(Int, Long)]()
+ // Shuffles which are queued for migration
+ private val shufflesToMigrate = new
java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]()
+
+
+ private class ShuffleMigrationRunnable(peer: BlockManagerId) extends
Runnable {
+ @volatile var running = true
+ override def run(): Unit = {
+ var migrating: Option[(Int, Long)] = None
+ val storageLevel = StorageLevel(
+ useDisk = true,
+ useMemory = false,
+ useOffHeap = false,
+ deserialized = false,
+ replication = 1)
+ logInfo(s"Starting migration thread for ${peer}")
+ // Once a block fails to transfer to an executor stop trying to transfer
more blocks
+ try {
+ while (running) {
+ val migrating = Option(shufflesToMigrate.poll())
+ migrating match {
+ case None =>
+ logInfo("Nothing to migrate")
+ // Nothing to do right now, but maybe a transfer will fail or a
new block
+ // will finish being committed.
+ val SLEEP_TIME_SECS = 1
+ Thread.sleep(SLEEP_TIME_SECS * 1000L)
+ case Some((shuffleId, mapId)) =>
+ logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to
${peer}")
+ val blocks =
+ migratableResolver.getMigrationBlocks(shuffleId, mapId)
+ logInfo(s"Got migration sub-blocks ${blocks}")
+ blocks.foreach { case (blockId, buffer) =>
+ logInfo(s"Migrating sub-block ${blockId}")
+ blockTransferService.uploadBlockSync(
+ peer.host,
+ peer.port,
+ peer.executorId,
+ blockId,
+ buffer,
+ storageLevel,
+ null)// class tag, we don't need for shuffle
+ logInfo(s"Migrated sub block ${blockId}")
+ }
+ logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")
+ }
+ }
+ // This catch is intentionally outside of the while running block.
+ // if we encounter errors migrating to an executor we want to stop.
+ } catch {
+ case e: Exception =>
+ migrating match {
+ case Some(shuffleMap) =>
+ logError("Error ${e} during migration, adding ${shuffleMap} back
to migration queue")
+ shufflesToMigrate.add(shuffleMap)
+ case None =>
+ logError("Error ${e} while waiting for block to migrate")
+ }
+ }
+ }
+ }
+
+ private val migrationPeers = mutable.HashMap[BlockManagerId,
ShuffleMigrationRunnable]()
+
+ /**
+ * Tries to offload all shuffle blocks that are registered with the shuffle
service locally.
+ * Note: this does not delete the shuffle files in-case there is an
in-progress fetch
+ * but rather shadows them.
+ * Requires an Indexed based shuffle resolver.
+ */
+ def offloadShuffleBlocks(): Unit = {
+ // Update the queue of shuffles to be migrated
+ logInfo("Offloading shuffle blocks")
+ val localShuffles = migratableResolver.getStoredShuffles()
+ logInfo(s"My local shuffles are ${localShuffles.toList}")
+ val newShufflesToMigrate = localShuffles.&~(migratingShuffles).toSeq
+ logInfo(s"My new shuffles to migrate ${newShufflesToMigrate.toList}")
+ shufflesToMigrate.addAll(newShufflesToMigrate.asJava)
+ migratingShuffles ++= newShufflesToMigrate
+
+ // Update the threads doing migrations
+ // TODO: Sort & only start as many threads as min(||blocks||, ||targets||)
using location pref
+ val livePeerSet = getPeers(false).toSet
+ val currentPeerSet = migrationPeers.keys.toSet
+ val deadPeers = currentPeerSet.&~(livePeerSet)
+ val newPeers = livePeerSet.&~(currentPeerSet)
+ migrationPeers ++= newPeers.map{peer =>
+ logDebug(s"Starting thread to migrate shuffle blocks to ${peer}")
+ val executor =
ThreadUtils.newDaemonSingleThreadExecutor(s"migrate-shuffle-to-${peer}")
+ val runnable = new ShuffleMigrationRunnable(peer)
+ executor.submit(runnable)
Review comment:
I would shutdown the `executor` in the `BlockManger#close()` to avoid
thread leaks and would rename to `migrationExecutionContext` as the name
executor at that level could be very misleading.
To do that I would introduce a new member val at the outer scope (at the
BlockManger class):
```scala
private var migrationExecutionContext: Option[ExecutorService] = None
```
Then change the lines to:
```suggestion
migrationExecutionContext =
Some(ThreadUtils.newDaemonSingleThreadExecutor(s"migrate-shuffle-to-${peer}"))
val runnable = new ShuffleMigrationRunnable(peer)
migrationExecutionContext.foreach(_.submit(runnable))
```
This way in the `BlockManager#stop()` you can cal the shut down:
```scala
migrationExecutionContext.foreach(_.shutdownNow())
```
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -650,6 +657,19 @@ private[spark] class BlockManager(
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[_]): StreamCallbackWithID = {
+ // Delegate shuffle blocks here to resolver if supported
Review comment:
Nit: I think the comment is not needed as your code is self-explanatory
here:
```suggestion
```
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1790,6 +1813,107 @@ private[spark] class BlockManager(
}
}
+
+ // Shuffles which are either in queue for migrations or migrated
+ private val migratingShuffles = mutable.HashSet[(Int, Long)]()
+ // Shuffles which are queued for migration
+ private val shufflesToMigrate = new
java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]()
+
+
+ private class ShuffleMigrationRunnable(peer: BlockManagerId) extends
Runnable {
+ @volatile var running = true
+ override def run(): Unit = {
+ var migrating: Option[(Int, Long)] = None
+ val storageLevel = StorageLevel(
+ useDisk = true,
+ useMemory = false,
+ useOffHeap = false,
+ deserialized = false,
+ replication = 1)
+ logInfo(s"Starting migration thread for ${peer}")
+ // Once a block fails to transfer to an executor stop trying to transfer
more blocks
+ try {
+ while (running) {
+ val migrating = Option(shufflesToMigrate.poll())
+ migrating match {
+ case None =>
+ logInfo("Nothing to migrate")
+ // Nothing to do right now, but maybe a transfer will fail or a
new block
+ // will finish being committed.
+ val SLEEP_TIME_SECS = 1
+ Thread.sleep(SLEEP_TIME_SECS * 1000L)
+ case Some((shuffleId, mapId)) =>
+ logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to
${peer}")
+ val blocks =
+ migratableResolver.getMigrationBlocks(shuffleId, mapId)
+ logInfo(s"Got migration sub-blocks ${blocks}")
+ blocks.foreach { case (blockId, buffer) =>
+ logInfo(s"Migrating sub-block ${blockId}")
+ blockTransferService.uploadBlockSync(
+ peer.host,
+ peer.port,
+ peer.executorId,
+ blockId,
+ buffer,
+ storageLevel,
+ null)// class tag, we don't need for shuffle
+ logInfo(s"Migrated sub block ${blockId}")
+ }
+ logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")
+ }
+ }
+ // This catch is intentionally outside of the while running block.
+ // if we encounter errors migrating to an executor we want to stop.
+ } catch {
+ case e: Exception =>
+ migrating match {
+ case Some(shuffleMap) =>
+ logError("Error ${e} during migration, adding ${shuffleMap} back
to migration queue")
+ shufflesToMigrate.add(shuffleMap)
+ case None =>
+ logError("Error ${e} while waiting for block to migrate")
+ }
+ }
+ }
+ }
+
+ private val migrationPeers = mutable.HashMap[BlockManagerId,
ShuffleMigrationRunnable]()
+
+ /**
+ * Tries to offload all shuffle blocks that are registered with the shuffle
service locally.
+ * Note: this does not delete the shuffle files in-case there is an
in-progress fetch
+ * but rather shadows them.
+ * Requires an Indexed based shuffle resolver.
+ */
+ def offloadShuffleBlocks(): Unit = {
+ // Update the queue of shuffles to be migrated
+ logInfo("Offloading shuffle blocks")
+ val localShuffles = migratableResolver.getStoredShuffles()
+ logInfo(s"My local shuffles are ${localShuffles.toList}")
+ val newShufflesToMigrate = localShuffles.&~(migratingShuffles).toSeq
+ logInfo(s"My new shuffles to migrate ${newShufflesToMigrate.toList}")
+ shufflesToMigrate.addAll(newShufflesToMigrate.asJava)
+ migratingShuffles ++= newShufflesToMigrate
+
+ // Update the threads doing migrations
+ // TODO: Sort & only start as many threads as min(||blocks||, ||targets||)
using location pref
+ val livePeerSet = getPeers(false).toSet
+ val currentPeerSet = migrationPeers.keys.toSet
+ val deadPeers = currentPeerSet.&~(livePeerSet)
+ val newPeers = livePeerSet.&~(currentPeerSet)
+ migrationPeers ++= newPeers.map{peer =>
+ logDebug(s"Starting thread to migrate shuffle blocks to ${peer}")
+ val executor =
ThreadUtils.newDaemonSingleThreadExecutor(s"migrate-shuffle-to-${peer}")
+ val runnable = new ShuffleMigrationRunnable(peer)
+ executor.submit(runnable)
+ (peer, runnable)
+ }
+ // A peer may have entered a decommissioning state, don't transfer any new
blocks
+ deadPeers.foreach{peer =>
Review comment:
Nit:
```suggestion
deadPeers.foreach { peer =>
```
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
##########
@@ -22,85 +22,226 @@ import java.util.concurrent.Semaphore
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite, Success}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark._
import org.apache.spark.internal.config
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd,
SparkListenerTaskStart}
+import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.util.{ResetSystemProperties, ThreadUtils}
class BlockManagerDecommissionSuite extends SparkFunSuite with
LocalSparkContext
- with ResetSystemProperties {
+ with ResetSystemProperties with Eventually {
+
+ val numExecs = 3
+ val numParts = 3
+
+ test(s"verify that an already running task which is going to cache data
succeeds " +
+ s"on a decommissioned executor") {
+ runDecomTest(true, false, true)
+ }
- override def beforeEach(): Unit = {
- val conf = new SparkConf().setAppName("test")
+ test(s"verify that shuffle blocks are migrated with force to disk") {
+ runDecomTest(false, true, false, remoteBlockSize = "1")
+ }
+
+ test(s"verify that shuffle blocks are migrated") {
+ runDecomTest(false, true, false)
+ }
+
+ test(s"verify that both migrations can work at the same time.") {
+ runDecomTest(true, true, false)
+ }
+
+ private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring:
Boolean,
+ remoteBlockSize: String = "100000") = {
+
+ val master = s"local-cluster[${numExecs}, 1, 1024]"
+ val conf = new SparkConf().setAppName("test").setMaster(master)
.set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
+ .set(config.STORAGE_RDD_DECOMMISSION_ENABLED, persist)
+ .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle)
+ // Just replicate blocks as fast as we can during testing, there isn't
another
+ // workload we need to worry about.
+ .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
- sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
- }
+ // Allow force fetching to local disk
+ conf.set("spark.network.maxRemoteBlockSizeFetchToMem", remoteBlockSize)
+
+ sc = new SparkContext(master, "test", conf)
- test(s"verify that an already running task which is going to cache data
succeeds " +
- s"on a decommissioned executor") {
// Create input RDD with 10 partitions
- val input = sc.parallelize(1 to 10, 10)
+ val input = sc.parallelize(1 to numParts, numParts)
val accum = sc.longAccumulator("mapperRunAccumulator")
// Do a count to wait for the executors to be registered.
input.count()
// Create a new RDD where we have sleep in each partition, we are also
increasing
// the value of accumulator in each partition
- val sleepyRdd = input.mapPartitions { x =>
- Thread.sleep(500)
+ val baseRdd = input.mapPartitions { x =>
+ if (migrateDuring) {
+ Thread.sleep(500)
+ }
accum.add(1)
- x
+ x.map(y => (y, y))
+ }
+ val testRdd = shuffle match {
+ case true => baseRdd.reduceByKey(_ + _)
+ case false => baseRdd
}
- // Listen for the job
- val sem = new Semaphore(0)
+ // Listen for the job & block updates
+ val taskStartSem = new Semaphore(0)
+ val broadcastSem = new Semaphore(0)
+ val executorRemovedSem = new Semaphore(0)
val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+ val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
sc.addSparkListener(new SparkListener {
+
+ override def onExecutorRemoved(execRemoved:
SparkListenerExecutorRemoved): Unit = {
+ executorRemovedSem.release()
+ }
+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
- sem.release()
+ taskStartSem.release()
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEndEvents.append(taskEnd)
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated):
Unit = {
+ // Once broadcast start landing on the executors we're good to proceed.
+ // We don't only use task start as it can occur before the work is on
the executor.
+ if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
+ broadcastSem.release()
+ }
+ blocksUpdated.append(blockUpdated)
+ }
})
+
// Cache the RDD lazily
- sleepyRdd.persist()
+ if (persist) {
+ testRdd.persist()
+ }
- // Start the computation of RDD - this step will also cache the RDD
- val asyncCount = sleepyRdd.countAsync()
+ // Wait for the first executor to start
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ numExecutors = 1,
+ timeout = 20000) // 20s
- // Wait for the job to have started
- sem.acquire(1)
+ // Start the computation of RDD - this step will also cache the RDD
+ val asyncCount = testRdd.countAsync()
+
+ // Wait for all of the executors to start
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ // We need to make sure there is the original plus one exec to migrate
too, we don't need
+ // the full set.
+ numExecutors = 2,
+ timeout = 30000) // 30s
+
+ // Wait for the job to have started.
+ taskStartSem.acquire(1)
+ // Wait for each executor + driver to have it's broadcast info delivered.
+ broadcastSem.acquire((numExecs + 1))
+
+ // Make sure the job is either mid run or otherwise has data to migrate.
+ if (migrateDuring) {
+ // Give Spark a tiny bit to start executing after the broadcast blocks
land.
+ // For me this works at 100, set to 300 for system variance.
+ Thread.sleep(300)
+ } else {
+ ThreadUtils.awaitResult(asyncCount, 15.seconds)
+ }
- // Give Spark a tiny bit to start the tasks after the listener says hello
- Thread.sleep(100)
// Decommission one of the executor
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
- assert(execs.size == 2, s"Expected 2 executors but found ${execs.size}")
+ assert(execs.size == numExecs, s"Expected ${numExecs} executors but found
${execs.size}")
+
val execToDecommission = execs.head
+ logDebug(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(execToDecommission)
// Wait for job to finish
- val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds)
- assert(asyncCountResult === 10)
- // All 10 tasks finished, so accum should have been increased 10 times
- assert(accum.value === 10)
+ val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
+ assert(asyncCountResult === numParts)
+ // All tasks finished, so accum should have been increased numParts times
+ assert(accum.value === numParts)
// All tasks should be successful, nothing should have failed
sc.listenerBus.waitUntilEmpty()
- assert(taskEndEvents.size === 10) // 10 mappers
- assert(taskEndEvents.map(_.reason).toSet === Set(Success))
+ if (shuffle) {
+ // mappers & reducers which succeeded
+ assert(taskEndEvents.count(_.reason == Success) === 2 * numParts,
+ s"Expected ${2 * numParts} tasks got ${taskEndEvents.size}
(${taskEndEvents})")
+ } else {
+ // only mappers which executed successfully
+ assert(taskEndEvents.count(_.reason == Success) === numParts,
+ s"Expected ${numParts} tasks got ${taskEndEvents.size}
(${taskEndEvents})")
+ }
- // Since the RDD is cached, so further usage of same RDD should use the
+ // Wait for our respective blocks to have migrated
+ eventually(timeout(15.seconds), interval(10.milliseconds)) {
+ if (persist) {
+ // One of our blocks should have moved.
+ val rddUpdates = blocksUpdated.filter{update =>
Review comment:
```suggestion
val rddUpdates = blocksUpdated.filter { update =>
```
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
##########
@@ -22,85 +22,226 @@ import java.util.concurrent.Semaphore
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite, Success}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark._
import org.apache.spark.internal.config
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd,
SparkListenerTaskStart}
+import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.util.{ResetSystemProperties, ThreadUtils}
class BlockManagerDecommissionSuite extends SparkFunSuite with
LocalSparkContext
- with ResetSystemProperties {
+ with ResetSystemProperties with Eventually {
+
+ val numExecs = 3
+ val numParts = 3
+
+ test(s"verify that an already running task which is going to cache data
succeeds " +
+ s"on a decommissioned executor") {
+ runDecomTest(true, false, true)
+ }
- override def beforeEach(): Unit = {
- val conf = new SparkConf().setAppName("test")
+ test(s"verify that shuffle blocks are migrated with force to disk") {
+ runDecomTest(false, true, false, remoteBlockSize = "1")
+ }
+
+ test(s"verify that shuffle blocks are migrated") {
+ runDecomTest(false, true, false)
+ }
+
+ test(s"verify that both migrations can work at the same time.") {
+ runDecomTest(true, true, false)
+ }
+
+ private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring:
Boolean,
+ remoteBlockSize: String = "100000") = {
+
+ val master = s"local-cluster[${numExecs}, 1, 1024]"
+ val conf = new SparkConf().setAppName("test").setMaster(master)
.set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
+ .set(config.STORAGE_RDD_DECOMMISSION_ENABLED, persist)
+ .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle)
+ // Just replicate blocks as fast as we can during testing, there isn't
another
+ // workload we need to worry about.
+ .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
- sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
- }
+ // Allow force fetching to local disk
+ conf.set("spark.network.maxRemoteBlockSizeFetchToMem", remoteBlockSize)
+
+ sc = new SparkContext(master, "test", conf)
- test(s"verify that an already running task which is going to cache data
succeeds " +
- s"on a decommissioned executor") {
// Create input RDD with 10 partitions
- val input = sc.parallelize(1 to 10, 10)
+ val input = sc.parallelize(1 to numParts, numParts)
val accum = sc.longAccumulator("mapperRunAccumulator")
// Do a count to wait for the executors to be registered.
input.count()
// Create a new RDD where we have sleep in each partition, we are also
increasing
// the value of accumulator in each partition
- val sleepyRdd = input.mapPartitions { x =>
- Thread.sleep(500)
+ val baseRdd = input.mapPartitions { x =>
+ if (migrateDuring) {
+ Thread.sleep(500)
+ }
accum.add(1)
- x
+ x.map(y => (y, y))
+ }
+ val testRdd = shuffle match {
+ case true => baseRdd.reduceByKey(_ + _)
+ case false => baseRdd
}
- // Listen for the job
- val sem = new Semaphore(0)
+ // Listen for the job & block updates
+ val taskStartSem = new Semaphore(0)
+ val broadcastSem = new Semaphore(0)
+ val executorRemovedSem = new Semaphore(0)
val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+ val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
sc.addSparkListener(new SparkListener {
+
+ override def onExecutorRemoved(execRemoved:
SparkListenerExecutorRemoved): Unit = {
+ executorRemovedSem.release()
+ }
+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
- sem.release()
+ taskStartSem.release()
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEndEvents.append(taskEnd)
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated):
Unit = {
+ // Once broadcast start landing on the executors we're good to proceed.
+ // We don't only use task start as it can occur before the work is on
the executor.
+ if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
+ broadcastSem.release()
+ }
+ blocksUpdated.append(blockUpdated)
+ }
})
+
// Cache the RDD lazily
- sleepyRdd.persist()
+ if (persist) {
+ testRdd.persist()
+ }
- // Start the computation of RDD - this step will also cache the RDD
- val asyncCount = sleepyRdd.countAsync()
+ // Wait for the first executor to start
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ numExecutors = 1,
+ timeout = 20000) // 20s
- // Wait for the job to have started
- sem.acquire(1)
+ // Start the computation of RDD - this step will also cache the RDD
+ val asyncCount = testRdd.countAsync()
+
+ // Wait for all of the executors to start
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ // We need to make sure there is the original plus one exec to migrate
too, we don't need
+ // the full set.
+ numExecutors = 2,
+ timeout = 30000) // 30s
+
+ // Wait for the job to have started.
+ taskStartSem.acquire(1)
+ // Wait for each executor + driver to have it's broadcast info delivered.
+ broadcastSem.acquire((numExecs + 1))
+
+ // Make sure the job is either mid run or otherwise has data to migrate.
+ if (migrateDuring) {
+ // Give Spark a tiny bit to start executing after the broadcast blocks
land.
+ // For me this works at 100, set to 300 for system variance.
+ Thread.sleep(300)
+ } else {
+ ThreadUtils.awaitResult(asyncCount, 15.seconds)
+ }
- // Give Spark a tiny bit to start the tasks after the listener says hello
- Thread.sleep(100)
// Decommission one of the executor
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
- assert(execs.size == 2, s"Expected 2 executors but found ${execs.size}")
+ assert(execs.size == numExecs, s"Expected ${numExecs} executors but found
${execs.size}")
+
val execToDecommission = execs.head
+ logDebug(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(execToDecommission)
// Wait for job to finish
- val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds)
- assert(asyncCountResult === 10)
- // All 10 tasks finished, so accum should have been increased 10 times
- assert(accum.value === 10)
+ val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
+ assert(asyncCountResult === numParts)
+ // All tasks finished, so accum should have been increased numParts times
+ assert(accum.value === numParts)
// All tasks should be successful, nothing should have failed
sc.listenerBus.waitUntilEmpty()
- assert(taskEndEvents.size === 10) // 10 mappers
- assert(taskEndEvents.map(_.reason).toSet === Set(Success))
+ if (shuffle) {
+ // mappers & reducers which succeeded
+ assert(taskEndEvents.count(_.reason == Success) === 2 * numParts,
+ s"Expected ${2 * numParts} tasks got ${taskEndEvents.size}
(${taskEndEvents})")
+ } else {
+ // only mappers which executed successfully
+ assert(taskEndEvents.count(_.reason == Success) === numParts,
+ s"Expected ${numParts} tasks got ${taskEndEvents.size}
(${taskEndEvents})")
+ }
- // Since the RDD is cached, so further usage of same RDD should use the
+ // Wait for our respective blocks to have migrated
+ eventually(timeout(15.seconds), interval(10.milliseconds)) {
+ if (persist) {
+ // One of our blocks should have moved.
+ val rddUpdates = blocksUpdated.filter{update =>
+ val blockId = update.blockUpdatedInfo.blockId
+ blockId.isRDD}
+ val blockLocs = rddUpdates.map{ update =>
+ (update.blockUpdatedInfo.blockId.name,
+ update.blockUpdatedInfo.blockManagerId)}
+ val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
+ assert(!blocksToManagers.filter(_._2 > 1).isEmpty,
+ s"We should have a block that has been on multiple BMs in rdds:\n
${rddUpdates} from:\n" +
+ s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
+ }
+ // If we're migrating shuffles we look for any shuffle block updates
+ // as there is no block update on the initial shuffle block write.
+ if (shuffle) {
+ val numDataLocs = blocksUpdated.filter{ update =>
Review comment:
```suggestion
val numDataLocs = blocksUpdated.filter { update =>
```
##########
File path:
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
##########
@@ -325,21 +325,37 @@ class KubernetesSuite extends SparkFunSuite
val result = checkPodReady(namespace, name)
result shouldBe (true)
}
- // Look for the string that indicates we're good to clean up
+ // Look for the string that indicates we're good to trigger
decom
// on the driver
Review comment:
Nit: fits in one line:
```suggestion
// Look for the string that indicates we're good to trigger
decom on the driver
```
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
##########
@@ -22,85 +22,226 @@ import java.util.concurrent.Semaphore
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite, Success}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark._
import org.apache.spark.internal.config
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd,
SparkListenerTaskStart}
+import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.util.{ResetSystemProperties, ThreadUtils}
class BlockManagerDecommissionSuite extends SparkFunSuite with
LocalSparkContext
- with ResetSystemProperties {
+ with ResetSystemProperties with Eventually {
+
+ val numExecs = 3
+ val numParts = 3
+
+ test(s"verify that an already running task which is going to cache data
succeeds " +
+ s"on a decommissioned executor") {
+ runDecomTest(true, false, true)
+ }
- override def beforeEach(): Unit = {
- val conf = new SparkConf().setAppName("test")
+ test(s"verify that shuffle blocks are migrated with force to disk") {
+ runDecomTest(false, true, false, remoteBlockSize = "1")
+ }
+
+ test(s"verify that shuffle blocks are migrated") {
+ runDecomTest(false, true, false)
+ }
+
+ test(s"verify that both migrations can work at the same time.") {
+ runDecomTest(true, true, false)
+ }
+
+ private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring:
Boolean,
+ remoteBlockSize: String = "100000") = {
+
+ val master = s"local-cluster[${numExecs}, 1, 1024]"
+ val conf = new SparkConf().setAppName("test").setMaster(master)
.set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
+ .set(config.STORAGE_RDD_DECOMMISSION_ENABLED, persist)
+ .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle)
+ // Just replicate blocks as fast as we can during testing, there isn't
another
+ // workload we need to worry about.
+ .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
- sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
- }
+ // Allow force fetching to local disk
+ conf.set("spark.network.maxRemoteBlockSizeFetchToMem", remoteBlockSize)
+
+ sc = new SparkContext(master, "test", conf)
- test(s"verify that an already running task which is going to cache data
succeeds " +
- s"on a decommissioned executor") {
// Create input RDD with 10 partitions
- val input = sc.parallelize(1 to 10, 10)
+ val input = sc.parallelize(1 to numParts, numParts)
val accum = sc.longAccumulator("mapperRunAccumulator")
// Do a count to wait for the executors to be registered.
input.count()
// Create a new RDD where we have sleep in each partition, we are also
increasing
// the value of accumulator in each partition
- val sleepyRdd = input.mapPartitions { x =>
- Thread.sleep(500)
+ val baseRdd = input.mapPartitions { x =>
+ if (migrateDuring) {
+ Thread.sleep(500)
+ }
accum.add(1)
- x
+ x.map(y => (y, y))
+ }
+ val testRdd = shuffle match {
+ case true => baseRdd.reduceByKey(_ + _)
+ case false => baseRdd
}
- // Listen for the job
- val sem = new Semaphore(0)
+ // Listen for the job & block updates
+ val taskStartSem = new Semaphore(0)
+ val broadcastSem = new Semaphore(0)
+ val executorRemovedSem = new Semaphore(0)
val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+ val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
sc.addSparkListener(new SparkListener {
+
+ override def onExecutorRemoved(execRemoved:
SparkListenerExecutorRemoved): Unit = {
+ executorRemovedSem.release()
+ }
+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
- sem.release()
+ taskStartSem.release()
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEndEvents.append(taskEnd)
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated):
Unit = {
+ // Once broadcast start landing on the executors we're good to proceed.
+ // We don't only use task start as it can occur before the work is on
the executor.
+ if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
+ broadcastSem.release()
+ }
+ blocksUpdated.append(blockUpdated)
+ }
})
+
// Cache the RDD lazily
- sleepyRdd.persist()
+ if (persist) {
+ testRdd.persist()
+ }
- // Start the computation of RDD - this step will also cache the RDD
- val asyncCount = sleepyRdd.countAsync()
+ // Wait for the first executor to start
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ numExecutors = 1,
+ timeout = 20000) // 20s
- // Wait for the job to have started
- sem.acquire(1)
+ // Start the computation of RDD - this step will also cache the RDD
+ val asyncCount = testRdd.countAsync()
+
+ // Wait for all of the executors to start
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ // We need to make sure there is the original plus one exec to migrate
too, we don't need
+ // the full set.
+ numExecutors = 2,
+ timeout = 30000) // 30s
+
+ // Wait for the job to have started.
+ taskStartSem.acquire(1)
+ // Wait for each executor + driver to have it's broadcast info delivered.
+ broadcastSem.acquire((numExecs + 1))
+
+ // Make sure the job is either mid run or otherwise has data to migrate.
+ if (migrateDuring) {
+ // Give Spark a tiny bit to start executing after the broadcast blocks
land.
+ // For me this works at 100, set to 300 for system variance.
+ Thread.sleep(300)
+ } else {
+ ThreadUtils.awaitResult(asyncCount, 15.seconds)
+ }
- // Give Spark a tiny bit to start the tasks after the listener says hello
- Thread.sleep(100)
// Decommission one of the executor
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
- assert(execs.size == 2, s"Expected 2 executors but found ${execs.size}")
+ assert(execs.size == numExecs, s"Expected ${numExecs} executors but found
${execs.size}")
+
val execToDecommission = execs.head
+ logDebug(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(execToDecommission)
// Wait for job to finish
- val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds)
- assert(asyncCountResult === 10)
- // All 10 tasks finished, so accum should have been increased 10 times
- assert(accum.value === 10)
+ val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
+ assert(asyncCountResult === numParts)
+ // All tasks finished, so accum should have been increased numParts times
+ assert(accum.value === numParts)
// All tasks should be successful, nothing should have failed
sc.listenerBus.waitUntilEmpty()
- assert(taskEndEvents.size === 10) // 10 mappers
- assert(taskEndEvents.map(_.reason).toSet === Set(Success))
+ if (shuffle) {
+ // mappers & reducers which succeeded
+ assert(taskEndEvents.count(_.reason == Success) === 2 * numParts,
+ s"Expected ${2 * numParts} tasks got ${taskEndEvents.size}
(${taskEndEvents})")
+ } else {
+ // only mappers which executed successfully
+ assert(taskEndEvents.count(_.reason == Success) === numParts,
+ s"Expected ${numParts} tasks got ${taskEndEvents.size}
(${taskEndEvents})")
+ }
- // Since the RDD is cached, so further usage of same RDD should use the
+ // Wait for our respective blocks to have migrated
+ eventually(timeout(15.seconds), interval(10.milliseconds)) {
+ if (persist) {
+ // One of our blocks should have moved.
+ val rddUpdates = blocksUpdated.filter{update =>
+ val blockId = update.blockUpdatedInfo.blockId
+ blockId.isRDD}
+ val blockLocs = rddUpdates.map{ update =>
Review comment:
```suggestion
val blockLocs = rddUpdates.map { update =>
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]