mridulm commented on code in PR #37960:
URL: https://github.com/apache/spark/pull/37960#discussion_r977119060


##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -18,14 +18,11 @@ package org.apache.spark.storage
 
 import java.io.{DataOutputStream, File, FileOutputStream, IOException}
 import java.nio.file.Files
-
 import scala.concurrent.duration._
-
 import org.apache.hadoop.conf.Configuration
 import org.mockito.{ArgumentMatchers => mc}
 import org.mockito.Mockito.{mock, never, verify, when}
 import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
-
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite, TestUtils}

Review Comment:
   Revert these whitespace changes.



##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -107,6 +106,51 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
     FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
   }
 
+  test("fallback storage APIs - readFully") {
+    val conf = new SparkConf(false)
+      .set("spark.app.id", "testId")
+      .set(SHUFFLE_COMPRESS, false)
+      .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+      .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
+        Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+    val fallbackStorage = new FallbackStorage(conf)
+    val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, 
false)
+
+    val bm = mock(classOf[BlockManager])
+    val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = 
false)
+    when(bm.diskBlockManager).thenReturn(dbm)
+    when(bm.master).thenReturn(bmm)
+    val resolver = new IndexShuffleBlockResolver(conf, bm)
+    when(bm.migratableResolver).thenReturn(resolver)
+
+    val length = 100000
+    val content = new Array[Byte](length)
+    Random.nextBytes(content)
+
+    val indexFile = resolver.getIndexFile(1, 2L)
+    tryWithResource(new FileOutputStream(indexFile)) { fos =>
+      tryWithResource(new DataOutputStream(fos)) { dos =>
+        dos.writeLong(0)
+        dos.writeLong(length)
+      }
+    }
+
+    val dataFile = resolver.getDataFile(1, 2L)
+    tryWithResource(new FileOutputStream(dataFile)) { fos =>
+      tryWithResource(new DataOutputStream(fos)) { dos =>
+        dos.write(content)
+      }
+    }
+
+    fallbackStorage.copy(ShuffleBlockInfo(1, 2L), bm)
+
+    assert(fallbackStorage.exists(1, ShuffleIndexBlockId(1, 2L, 
NOOP_REDUCE_ID).name))
+    assert(fallbackStorage.exists(1, ShuffleDataBlockId(1, 2L, 
NOOP_REDUCE_ID).name))
+
+    val readResult = FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
+    assert(readResult.nioByteBuffer().array().sameElements(content))

Review Comment:
   This test is not checking for `readFully` and would work even for `read`, 
depending on whether the read ends up satisfying the request or not (We are 
relying on what the buffer size might be internally, which is subject to 
change).
   As in, the test could work even without the fix.
   



##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -38,6 +35,8 @@ import org.apache.spark.shuffle.{IndexShuffleBlockResolver, 
ShuffleBlockInfo}
 import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
 import org.apache.spark.util.Utils.tryWithResource
 
+import scala.util.Random

Review Comment:
   Move to scala block above



##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -107,6 +106,51 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
     FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
   }
 
+  test("fallback storage APIs - readFully") {
+    val conf = new SparkConf(false)
+      .set("spark.app.id", "testId")
+      .set(SHUFFLE_COMPRESS, false)
+      .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+      .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
+        Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+    val fallbackStorage = new FallbackStorage(conf)
+    val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, 
false)
+
+    val bm = mock(classOf[BlockManager])
+    val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = 
false)
+    when(bm.diskBlockManager).thenReturn(dbm)
+    when(bm.master).thenReturn(bmm)
+    val resolver = new IndexShuffleBlockResolver(conf, bm)
+    when(bm.migratableResolver).thenReturn(resolver)
+
+    val length = 100000
+    val content = new Array[Byte](length)
+    Random.nextBytes(content)
+
+    val indexFile = resolver.getIndexFile(1, 2L)
+    tryWithResource(new FileOutputStream(indexFile)) { fos =>
+      tryWithResource(new DataOutputStream(fos)) { dos =>
+        dos.writeLong(0)
+        dos.writeLong(length)
+      }
+    }
+
+    val dataFile = resolver.getDataFile(1, 2L)
+    tryWithResource(new FileOutputStream(dataFile)) { fos =>
+      tryWithResource(new DataOutputStream(fos)) { dos =>

Review Comment:
   We dont need the `DataOutputStream` here



##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -107,6 +106,51 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
     FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
   }
 
+  test("fallback storage APIs - readFully") {
+    val conf = new SparkConf(false)
+      .set("spark.app.id", "testId")
+      .set(SHUFFLE_COMPRESS, false)
+      .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+      .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
+        Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+    val fallbackStorage = new FallbackStorage(conf)
+    val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, 
false)
+
+    val bm = mock(classOf[BlockManager])
+    val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = 
false)
+    when(bm.diskBlockManager).thenReturn(dbm)
+    when(bm.master).thenReturn(bmm)
+    val resolver = new IndexShuffleBlockResolver(conf, bm)
+    when(bm.migratableResolver).thenReturn(resolver)
+
+    val length = 100000
+    val content = new Array[Byte](length)
+    Random.nextBytes(content)
+
+    val indexFile = resolver.getIndexFile(1, 2L)
+    tryWithResource(new FileOutputStream(indexFile)) { fos =>
+      tryWithResource(new DataOutputStream(fos)) { dos =>

Review Comment:
   nit: `tryWithResource` is not required for the `DataOutputStream` - though 
that seems to be a pattern in rest of this Suite



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to