agrawaldevesh commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r464798358



##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue
+    try {
+      bmDecomManager.start()
+      eventually(timeout(10.second), interval(10.milliseconds)) {

Review comment:
       Is 10 seconds too short here ?

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,

Review comment:
       Can you think of a better name than 'validate' ? Perhaps validateWhat ?

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue
+    try {
+      bmDecomManager.start()
+      eventually(timeout(10.second), interval(10.milliseconds)) {
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done)
+        // Make sure the time stamp starts moving forward.
+        if (!fail && previousTime > currentTime) {
+          previousTime = currentTime
+          assert(false)
+        } else if (fail) {
+          assert(currentTime === Long.MaxValue)
+        }
+      }
+      if (!fail) {
+        // Wait 5 seconds and assert times keep moving forward.
+        Thread.sleep(5000)
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done && currentTime > previousTime)
+      }
+    } finally {
+      bmDecomManager.stop()

Review comment:
       Just making sure I understand: Is this test also intending to test that 
the BM decommissioning thread cleanly exits on its own ? I forget whether that 
was easy to check or not. 

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -266,18 +266,17 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     val execIdToBlocksMapping = storageStatus.map(
       status => (status.blockManagerId.executorId, status.blocks)).toMap
     // No cached blocks should be present on executor which was decommissioned
-    
assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === 
Seq(),
+    assert(
+      !execIdToBlocksMapping.contains(execToDecommission) ||
+      execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === 
Seq(),
       "Cache blocks should be migrated")
     if (persist) {
       // There should still be all the RDD blocks cached
       assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 
numParts)
     }
 
-    // Make the executor we decommissioned exit
-    sched.client.killExecutors(List(execToDecommission))
-
-    // Wait for the executor to be removed
-    executorRemovedSem.acquire(1)
+    // Wait for the executor to be removed automatically after migration.
+    assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES))

Review comment:
       Can this timeout be shortened ? 5 minutes is a bit long ? Why would it 
take this much time ? Can we play around with some of the other configs to 
reduce this ?

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue

Review comment:
       I am curious if making this be an Optional[Long] would simplify the less 
than/greater than boundary checks below ?

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue
+    try {
+      bmDecomManager.start()
+      eventually(timeout(10.second), interval(10.milliseconds)) {
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done)
+        // Make sure the time stamp starts moving forward.
+        if (!fail && previousTime > currentTime) {
+          previousTime = currentTime
+          assert(false)
+        } else if (fail) {
+          assert(currentTime === Long.MaxValue)
+        }
+      }
+      if (!fail) {
+        // Wait 5 seconds and assert times keep moving forward.
+        Thread.sleep(5000)
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done && currentTime > previousTime)
+      }
+    } finally {
+      bmDecomManager.stop()
+    }
+  }
+
+  test("test that with no blocks we finish migration") {
+    // Set up the mocks so we return empty
+    val bm = mock(classOf[BlockManager])
+    val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+    when(migratableShuffleBlockResolver.getStoredShuffles())
+      .thenReturn(Seq())
+    when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+    when(bm.getMigratableRDDBlocks())
+      .thenReturn(Seq())
+    when(bm.getPeers(mc.any()))
+      .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+

Review comment:
       nit: extra line

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue
+    try {
+      bmDecomManager.start()
+      eventually(timeout(10.second), interval(10.milliseconds)) {
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done)
+        // Make sure the time stamp starts moving forward.
+        if (!fail && previousTime > currentTime) {
+          previousTime = currentTime
+          assert(false)
+        } else if (fail) {
+          assert(currentTime === Long.MaxValue)
+        }
+      }
+      if (!fail) {
+        // Wait 5 seconds and assert times keep moving forward.
+        Thread.sleep(5000)
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done && currentTime > previousTime)
+      }
+    } finally {
+      bmDecomManager.stop()
+    }
+  }
+

Review comment:
       What about a case where new work is assigned to the executor and thus 
new shuffle files show up ? Is that handled in the cases below ? A scenario 
where we start the migration and then new shuffle work is created midway.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue
+    try {
+      bmDecomManager.start()
+      eventually(timeout(10.second), interval(10.milliseconds)) {
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done)
+        // Make sure the time stamp starts moving forward.
+        if (!fail && previousTime > currentTime) {
+          previousTime = currentTime

Review comment:
       question: Can we assert here before line 77 that previousTime == 
Long.maxValue ?

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly

Review comment:
       This comment is not particularly informative. Can you elaborate what is 
it trying to test ?

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -327,4 +366,33 @@ private[storage] class BlockManagerDecommissioner(
     }
     logInfo("Stopped storage decommissioner")
   }
+
+  /*

Review comment:
       Thanks for the doc !




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to