mridulm commented on code in PR #37960:
URL: https://github.com/apache/spark/pull/37960#discussion_r977119060
##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -18,14 +18,11 @@ package org.apache.spark.storage
import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.nio.file.Files
-
import scala.concurrent.duration._
-
import org.apache.hadoop.conf.Configuration
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
-
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite, TestUtils}
Review Comment:
Revert these whitespace changes.
##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -107,6 +106,51 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
}
+ test("fallback storage APIs - readFully") {
+ val conf = new SparkConf(false)
+ .set("spark.app.id", "testId")
+ .set(SHUFFLE_COMPRESS, false)
+ .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+ .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
+ Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+ val fallbackStorage = new FallbackStorage(conf)
+ val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf,
false)
+
+ val bm = mock(classOf[BlockManager])
+ val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver =
false)
+ when(bm.diskBlockManager).thenReturn(dbm)
+ when(bm.master).thenReturn(bmm)
+ val resolver = new IndexShuffleBlockResolver(conf, bm)
+ when(bm.migratableResolver).thenReturn(resolver)
+
+ val length = 100000
+ val content = new Array[Byte](length)
+ Random.nextBytes(content)
+
+ val indexFile = resolver.getIndexFile(1, 2L)
+ tryWithResource(new FileOutputStream(indexFile)) { fos =>
+ tryWithResource(new DataOutputStream(fos)) { dos =>
+ dos.writeLong(0)
+ dos.writeLong(length)
+ }
+ }
+
+ val dataFile = resolver.getDataFile(1, 2L)
+ tryWithResource(new FileOutputStream(dataFile)) { fos =>
+ tryWithResource(new DataOutputStream(fos)) { dos =>
+ dos.write(content)
+ }
+ }
+
+ fallbackStorage.copy(ShuffleBlockInfo(1, 2L), bm)
+
+ assert(fallbackStorage.exists(1, ShuffleIndexBlockId(1, 2L,
NOOP_REDUCE_ID).name))
+ assert(fallbackStorage.exists(1, ShuffleDataBlockId(1, 2L,
NOOP_REDUCE_ID).name))
+
+ val readResult = FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
+ assert(readResult.nioByteBuffer().array().sameElements(content))
Review Comment:
This test is not checking for `readFully` and would work even for `read`,
depending on whether the read ends up satisfying the request or not (We are
relying on what the buffer size might be internally, which is subject to
change).
As in, the test could work even without the fix.
##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -38,6 +35,8 @@ import org.apache.spark.shuffle.{IndexShuffleBlockResolver,
ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.util.Utils.tryWithResource
+import scala.util.Random
Review Comment:
Move to scala block above
##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -107,6 +106,51 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
}
+ test("fallback storage APIs - readFully") {
+ val conf = new SparkConf(false)
+ .set("spark.app.id", "testId")
+ .set(SHUFFLE_COMPRESS, false)
+ .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+ .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
+ Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+ val fallbackStorage = new FallbackStorage(conf)
+ val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf,
false)
+
+ val bm = mock(classOf[BlockManager])
+ val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver =
false)
+ when(bm.diskBlockManager).thenReturn(dbm)
+ when(bm.master).thenReturn(bmm)
+ val resolver = new IndexShuffleBlockResolver(conf, bm)
+ when(bm.migratableResolver).thenReturn(resolver)
+
+ val length = 100000
+ val content = new Array[Byte](length)
+ Random.nextBytes(content)
+
+ val indexFile = resolver.getIndexFile(1, 2L)
+ tryWithResource(new FileOutputStream(indexFile)) { fos =>
+ tryWithResource(new DataOutputStream(fos)) { dos =>
+ dos.writeLong(0)
+ dos.writeLong(length)
+ }
+ }
+
+ val dataFile = resolver.getDataFile(1, 2L)
+ tryWithResource(new FileOutputStream(dataFile)) { fos =>
+ tryWithResource(new DataOutputStream(fos)) { dos =>
Review Comment:
We dont need the `DataOutputStream` here
##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -107,6 +106,51 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
}
+ test("fallback storage APIs - readFully") {
+ val conf = new SparkConf(false)
+ .set("spark.app.id", "testId")
+ .set(SHUFFLE_COMPRESS, false)
+ .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+ .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
+ Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+ val fallbackStorage = new FallbackStorage(conf)
+ val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf,
false)
+
+ val bm = mock(classOf[BlockManager])
+ val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver =
false)
+ when(bm.diskBlockManager).thenReturn(dbm)
+ when(bm.master).thenReturn(bmm)
+ val resolver = new IndexShuffleBlockResolver(conf, bm)
+ when(bm.migratableResolver).thenReturn(resolver)
+
+ val length = 100000
+ val content = new Array[Byte](length)
+ Random.nextBytes(content)
+
+ val indexFile = resolver.getIndexFile(1, 2L)
+ tryWithResource(new FileOutputStream(indexFile)) { fos =>
+ tryWithResource(new DataOutputStream(fos)) { dos =>
Review Comment:
nit: `tryWithResource` is not required for the `DataOutputStream` - though
that seems to be a pattern in rest of this Suite
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]