attilapiros commented on a change in pull request #28331:
URL: https://github.com/apache/spark/pull/28331#discussion_r432527817
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
##########
@@ -66,41 +90,96 @@ class BlockManagerDecommissionSuite extends SparkFunSuite
with LocalSparkContext
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEndEvents.append(taskEnd)
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated):
Unit = {
+ blocksUpdated.append(blockUpdated)
+ }
})
+
// Cache the RDD lazily
- sleepyRdd.persist()
+ if (persist) {
+ testRdd.persist()
+ }
+
+ // Wait for all of the executors to start
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ numExecutors = numExecs,
+ timeout = 10000) // 10s
// Start the computation of RDD - this step will also cache the RDD
- val asyncCount = sleepyRdd.countAsync()
+ val asyncCount = testRdd.countAsync()
// Wait for the job to have started
sem.acquire(1)
// Give Spark a tiny bit to start the tasks after the listener says hello
- Thread.sleep(100)
+ Thread.sleep(50)
+
// Decommission one of the executor
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
- assert(execs.size == 2, s"Expected 2 executors but found ${execs.size}")
+ assert(execs.size == numExecs, s"Expected ${numExecs} executors but found
${execs.size}")
+
val execToDecommission = execs.head
+ logDebug(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(execToDecommission)
// Wait for job to finish
- val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds)
+ val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
assert(asyncCountResult === 10)
// All 10 tasks finished, so accum should have been increased 10 times
assert(accum.value === 10)
// All tasks should be successful, nothing should have failed
sc.listenerBus.waitUntilEmpty()
- assert(taskEndEvents.size === 10) // 10 mappers
- assert(taskEndEvents.map(_.reason).toSet === Set(Success))
+ if (shuffle) {
+ // 10 mappers & 10 reducers which succeeded
+ assert(taskEndEvents.count(_.reason == Success) === 20,
+ s"Expected 20 tasks got ${taskEndEvents.size} (${taskEndEvents})")
+ } else {
+ // 10 mappers which executed successfully
+ assert(taskEndEvents.count(_.reason == Success) === 10,
+ s"Expected 10 tasks got ${taskEndEvents.size} (${taskEndEvents})")
+ }
- // Since the RDD is cached, so further usage of same RDD should use the
+ // Wait for our respective blocks to have migrated
+ eventually(timeout(15.seconds), interval(10.milliseconds)) {
+ if (persist) {
+ // One of our blocks should have moved.
+ val blockLocs = blocksUpdated.map{ update =>
Review comment:
As there is a test case where both RDD and shuffle block migration are
tested we should filter for RDD blocks here.
Actually I have tried it by
```suggestion
val blockLocs =
blocksUpdated.filter(_.blockUpdatedInfo.blockId.isRDD).map{ update =>
```
But then the RDD tests started to fail as not RDD blocks was satisfying the
condition below but broadcast variables, when I logged out
`blocksToManagers.filter(_._2 > 1)` I got:
```
broadcast_1_piece0 -> 4
```
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -148,6 +170,85 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ /**
+ * Write a provided shuffle block as a stream. Used for block migrations.
+ * ShuffleBlockBatchIds must contain the full range represented in the
ShuffleIndexBlock.
+ * Requires the caller to delete any shuffle index blocks where the shuffle
block fails to
+ * put.
+ */
+ override def putShuffleBlockAsStream(blockId: BlockId, serializerManager:
SerializerManager):
+ StreamCallbackWithID = {
+ val file = blockId match {
+ case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ getIndexFile(shuffleId, mapId)
+ case ShuffleBlockBatchId(shuffleId, mapId, _, _) =>
Review comment:
This must be `ShuffleDataBlockId`.
To get this method tested with small data I suggest to set
`spark.network.maxRemoteBlockSizeFetchToMem` to a very low value i.e. to 1.
After that without fixing this issue you will see the updated exception
message (see a few lines below) will be: `java.lang.Exception: Unexpected
shuffle block transfer shuffle_0_11_0.data as ShuffleDataBlockId`
And unfortunately the test will still pass as the corresponding index file
`shuffle_0_11_0.index` was successfully migrated and the assert in the suite
checks whether at least one migrated shuffle block:
https://github.com/apache/spark/blob/70c3871ed801ed2b5e964e321bcfaba33f8735af/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala#L164
The shuffle data file probably was still served from the decommissioned
executor otherwise the recalculation would be detected by the number of
successful tasks/accumulator.
Please consider covering this case by introducing a new boolean flag to
control this setting and check at least one shuffle migration test along with
this setting.
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -148,6 +170,85 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ /**
+ * Write a provided shuffle block as a stream. Used for block migrations.
+ * ShuffleBlockBatchIds must contain the full range represented in the
ShuffleIndexBlock.
+ * Requires the caller to delete any shuffle index blocks where the shuffle
block fails to
+ * put.
+ */
+ override def putShuffleBlockAsStream(blockId: BlockId, serializerManager:
SerializerManager):
+ StreamCallbackWithID = {
+ val file = blockId match {
+ case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ getIndexFile(shuffleId, mapId)
+ case ShuffleBlockBatchId(shuffleId, mapId, _, _) =>
+ getDataFile(shuffleId, mapId)
+ case _ =>
+ throw new Exception(s"Unexpected shuffle block transfer ${blockId}")
Review comment:
```suggestion
throw new Exception(s"Unexpected shuffle block transfer ${blockId}
as " +
s"${blockId.getClass().getSimpleName()}")
```
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -505,6 +509,18 @@ class BlockManagerMasterEndpoint(
return true
}
+ if (blockId.isInternalShuffle && storageLevel.isValid) {
+ blockId match {
+ case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ // Don't update the map output on just the index block
+ logDebug("Received shuffle index block update for ${shuffleId}
${mapId}")
+ case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) =>
+ mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
+ case _ =>
+ logError(s"Unexpected shuffle block type ${blockId}")
+ }
+ }
Review comment:
I think in case of shuffle files the rest is not needed as it is enough
to update the `mapOutputTracker` and we can skip updating the `blockLocations`
which is only used for broadcast and RDD blocks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]