otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r649435207
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -22,31 +22,40 @@ import java.nio.ByteBuffer
import java.util.UUID
import java.util.concurrent.{CompletableFuture, Semaphore}
+import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import io.netty.util.internal.OutOfDirectMemoryError
import org.mockito.ArgumentMatchers.{any, eq => meq}
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.Mockito.{doThrow, mock, times, verify, when}
+import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
+import org.roaringbitmap.RoaringBitmap
import org.scalatest.PrivateMethodTester
-import org.apache.spark.{SparkFunSuite, TaskContext}
+import org.apache.spark.{MapOutputTracker, SparkFunSuite, TaskContext}
+import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
import org.apache.spark.network._
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer,
ManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener,
DownloadFileManager, ExternalBlockStoreClient}
+import org.apache.spark.network.shuffle.{BlockFetchingListener,
DownloadFileManager, ExternalBlockStoreClient, MergedBlockMeta,
MergedBlocksMetaListener}
import org.apache.spark.network.util.LimitedInputStream
import org.apache.spark.shuffle.{FetchFailedException,
ShuffleReadMetricsReporter}
-import org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchBlockInfo
+import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER
+import org.apache.spark.storage.ShuffleBlockFetcherIterator._
import org.apache.spark.util.Utils
class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with
PrivateMethodTester {
Review comment:
> a) deserialization failure results in initiating fallback.
Yes, the test `fallback to original shuffle block when a merged block chunk
is corrupt` does this. It tests the fallback when the shuffle merged chunk is
deserialized during processing of `SuccessFetchResult`.
> b) fetch failure of both merged block and fallback block should get
reported to driver as fetch failure.
When there is a fetch failure of a merged block, then the iterator falls
back to fetch original blocks. So, we don't report that to the driver because
the task didn't fail because of it. It tries to fetch the original blocks that
make up that merged blocks.
I have added tests for the various conditions that trigger fallback but
these simulate fetches of all original blocks to be successful. I haven't
added a test which triggers fallback but the iterator fails to fetch an
original block and that throws FetchFailedException. That follows the existing
code but I will still add this test for it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]