Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/6423#discussion_r32173214
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala ---
@@ -33,17 +34,52 @@ private[spark] class HashShuffleReader[K, C](
"Hash shuffle currently only supports fetching one partition")
private val dep = handle.dependency
+ private val blockManager = SparkEnv.get.blockManager
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
+ val blockStreams = BlockStoreShuffleFetcher.fetchBlockStreams(
+ handle.shuffleId, startPartition, context)
+
+ // Wrap the streams for compression based on configuration
+ val wrappedStreams = blockStreams.map { case (blockId, inputStream) =>
+ blockManager.wrapForCompression(blockId, inputStream)
+ }
+
val ser = Serializer.getSerializer(dep.serializer)
- val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId,
startPartition, context, ser)
+ val serializerInstance = ser.newInstance()
+
+ // Create a key/value iterator for each stream
+ val recordIterator = wrappedStreams.flatMap { wrappedStream =>
+
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
+ }
+
+ val readMetrics =
context.taskMetrics.createShuffleReadMetricsForDependency()
+ // Update read metrics for each record materialized
+ val metricIter = new InterruptibleIterator[(Any, Any)](context,
recordIterator) {
--- End diff --
This commit makes a lot of nice changes to improve readability of this
section of code, and I have one more suggestion here that I realize isn't a
direct consequence of your change, but that it seems like we might as well do
as part of this, if you don't mind.
One thing that gets lost here is the fact that we NEED to use an
interruptible iterator to support task cancellation (so we can stop the
iterator in the middle). It's very non-obvious right now -- it seems like the
interruptible iterator is for the metrics. What about making a single
metricsIterator that is a CompletionIterator (to get the
updateShuffleReadMetrics() call) and then also overrides next (to get the
readMetrics.incRecordsRead(1)) call. With that change, all of the metrics
incrementing would conveniently happen as part of a single iterator.
Then, we could wrap that in an interruptible iterator (just called
interruptibleIterator or something) with a comment saying that we wrap the end
product in an interruptible iterator so that the task can be cancelled partway
through?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]