agrawaldevesh commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r464798358
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
+ try {
+ bmDecomManager.start()
+ eventually(timeout(10.second), interval(10.milliseconds)) {
Review comment:
Is 10 seconds too short here ?
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
Review comment:
Can you think of a better name than 'validate' ? Perhaps validateWhat ?
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
+ try {
+ bmDecomManager.start()
+ eventually(timeout(10.second), interval(10.milliseconds)) {
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done)
+ // Make sure the time stamp starts moving forward.
+ if (!fail && previousTime > currentTime) {
+ previousTime = currentTime
+ assert(false)
+ } else if (fail) {
+ assert(currentTime === Long.MaxValue)
+ }
+ }
+ if (!fail) {
+ // Wait 5 seconds and assert times keep moving forward.
+ Thread.sleep(5000)
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done && currentTime > previousTime)
+ }
+ } finally {
+ bmDecomManager.stop()
Review comment:
Just making sure I understand: Is this test also intending to test that
the BM decommissioning thread cleanly exits on its own ? I forget whether that
was easy to check or not.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -266,18 +266,17 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
val execIdToBlocksMapping = storageStatus.map(
status => (status.blockManagerId.executorId, status.blocks)).toMap
// No cached blocks should be present on executor which was decommissioned
-
assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq ===
Seq(),
+ assert(
+ !execIdToBlocksMapping.contains(execToDecommission) ||
+ execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq ===
Seq(),
"Cache blocks should be migrated")
if (persist) {
// There should still be all the RDD blocks cached
assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) ===
numParts)
}
- // Make the executor we decommissioned exit
- sched.client.killExecutors(List(execToDecommission))
-
- // Wait for the executor to be removed
- executorRemovedSem.acquire(1)
+ // Wait for the executor to be removed automatically after migration.
+ assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES))
Review comment:
Can this timeout be shortened ? 5 minutes is a bit long ? Why would it
take this much time ? Can we play around with some of the other configs to
reduce this ?
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
Review comment:
I am curious if making this be an Optional[Long] would simplify the less
than/greater than boundary checks below ?
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
+ try {
+ bmDecomManager.start()
+ eventually(timeout(10.second), interval(10.milliseconds)) {
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done)
+ // Make sure the time stamp starts moving forward.
+ if (!fail && previousTime > currentTime) {
+ previousTime = currentTime
+ assert(false)
+ } else if (fail) {
+ assert(currentTime === Long.MaxValue)
+ }
+ }
+ if (!fail) {
+ // Wait 5 seconds and assert times keep moving forward.
+ Thread.sleep(5000)
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done && currentTime > previousTime)
+ }
+ } finally {
+ bmDecomManager.stop()
+ }
+ }
+
+ test("test that with no blocks we finish migration") {
+ // Set up the mocks so we return empty
+ val bm = mock(classOf[BlockManager])
+ val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+ when(migratableShuffleBlockResolver.getStoredShuffles())
+ .thenReturn(Seq())
+ when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+ when(bm.getMigratableRDDBlocks())
+ .thenReturn(Seq())
+ when(bm.getPeers(mc.any()))
+ .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+
Review comment:
nit: extra line
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
+ try {
+ bmDecomManager.start()
+ eventually(timeout(10.second), interval(10.milliseconds)) {
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done)
+ // Make sure the time stamp starts moving forward.
+ if (!fail && previousTime > currentTime) {
+ previousTime = currentTime
+ assert(false)
+ } else if (fail) {
+ assert(currentTime === Long.MaxValue)
+ }
+ }
+ if (!fail) {
+ // Wait 5 seconds and assert times keep moving forward.
+ Thread.sleep(5000)
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done && currentTime > previousTime)
+ }
+ } finally {
+ bmDecomManager.stop()
+ }
+ }
+
Review comment:
What about a case where new work is assigned to the executor and thus
new shuffle files show up ? Is that handled in the cases below ? A scenario
where we start the migration and then new shuffle work is created midway.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
+ try {
+ bmDecomManager.start()
+ eventually(timeout(10.second), interval(10.milliseconds)) {
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done)
+ // Make sure the time stamp starts moving forward.
+ if (!fail && previousTime > currentTime) {
+ previousTime = currentTime
Review comment:
question: Can we assert here before line 77 that previousTime ==
Long.maxValue ?
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
Review comment:
This comment is not particularly informative. Can you elaborate what is
it trying to test ?
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -327,4 +366,33 @@ private[storage] class BlockManagerDecommissioner(
}
logInfo("Stopped storage decommissioner")
}
+
+ /*
Review comment:
Thanks for the doc !
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]