holdenk commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r465043833



##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue
+    try {
+      bmDecomManager.start()
+      eventually(timeout(10.second), interval(10.milliseconds)) {

Review comment:
       Sure I'll bump it up. Given that our loop is under a second though I 
think it's pretty safe (10x safety factor). Let's go for 100 seconds to be safe 
though.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue
+    try {
+      bmDecomManager.start()
+      eventually(timeout(10.second), interval(10.milliseconds)) {
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done)
+        // Make sure the time stamp starts moving forward.
+        if (!fail && previousTime > currentTime) {
+          previousTime = currentTime
+          assert(false)
+        } else if (fail) {
+          assert(currentTime === Long.MaxValue)
+        }
+      }
+      if (!fail) {
+        // Wait 5 seconds and assert times keep moving forward.
+        Thread.sleep(5000)
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done && currentTime > previousTime)
+      }
+    } finally {
+      bmDecomManager.stop()

Review comment:
       No this test does not do that. We test that in the integration suite & 
the k8s suite.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue

Review comment:
       Yeah that could do that.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue
+    try {
+      bmDecomManager.start()
+      eventually(timeout(10.second), interval(10.milliseconds)) {
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done)
+        // Make sure the time stamp starts moving forward.
+        if (!fail && previousTime > currentTime) {
+          previousTime = currentTime

Review comment:
       Sure, I'll change this around to use an option though as you suggested 
up above.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly

Review comment:
       sgtm

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue
+    try {
+      bmDecomManager.start()
+      eventually(timeout(10.second), interval(10.milliseconds)) {
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done)
+        // Make sure the time stamp starts moving forward.
+        if (!fail && previousTime > currentTime) {
+          previousTime = currentTime
+          assert(false)
+        } else if (fail) {
+          assert(currentTime === Long.MaxValue)
+        }
+      }
+      if (!fail) {
+        // Wait 5 seconds and assert times keep moving forward.
+        Thread.sleep(5000)
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done && currentTime > previousTime)
+      }
+    } finally {
+      bmDecomManager.stop()
+    }
+  }
+

Review comment:
       That's covered in the integration suite by asserting the data is 
migrated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to