holdenk commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r465043833
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
+ try {
+ bmDecomManager.start()
+ eventually(timeout(10.second), interval(10.milliseconds)) {
Review comment:
Sure I'll bump it up. Given that our loop is under a second though I
think it's pretty safe (10x safety factor). Let's go for 100 seconds to be safe
though.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
+ try {
+ bmDecomManager.start()
+ eventually(timeout(10.second), interval(10.milliseconds)) {
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done)
+ // Make sure the time stamp starts moving forward.
+ if (!fail && previousTime > currentTime) {
+ previousTime = currentTime
+ assert(false)
+ } else if (fail) {
+ assert(currentTime === Long.MaxValue)
+ }
+ }
+ if (!fail) {
+ // Wait 5 seconds and assert times keep moving forward.
+ Thread.sleep(5000)
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done && currentTime > previousTime)
+ }
+ } finally {
+ bmDecomManager.stop()
Review comment:
No this test does not do that. We test that in the integration suite &
the k8s suite.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
Review comment:
Yeah that could do that.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
+ try {
+ bmDecomManager.start()
+ eventually(timeout(10.second), interval(10.milliseconds)) {
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done)
+ // Make sure the time stamp starts moving forward.
+ if (!fail && previousTime > currentTime) {
+ previousTime = currentTime
Review comment:
Sure, I'll change this around to use an option though as you suggested
up above.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
Review comment:
sgtm
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
}
}
+ /**
+ * Validate a given configuration with the mocks.
+ * The fail variable controls if we expect migration to fail, in which case
we expect
+ * a constant Long.MaxValue timestamp.
+ */
+ private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+ migratableShuffleBlockResolver: MigratableResolver, fail: Boolean =
false) = {
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ var previousTime = Long.MaxValue
+ try {
+ bmDecomManager.start()
+ eventually(timeout(10.second), interval(10.milliseconds)) {
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done)
+ // Make sure the time stamp starts moving forward.
+ if (!fail && previousTime > currentTime) {
+ previousTime = currentTime
+ assert(false)
+ } else if (fail) {
+ assert(currentTime === Long.MaxValue)
+ }
+ }
+ if (!fail) {
+ // Wait 5 seconds and assert times keep moving forward.
+ Thread.sleep(5000)
+ val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+ assert(done && currentTime > previousTime)
+ }
+ } finally {
+ bmDecomManager.stop()
+ }
+ }
+
Review comment:
That's covered in the integration suite by asserting the data is
migrated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]