Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/6423#discussion_r32256571
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala ---
@@ -33,23 +34,55 @@ private[spark] class HashShuffleReader[K, C](
"Hash shuffle currently only supports fetching one partition")
private val dep = handle.dependency
+ private val blockManager = SparkEnv.get.blockManager
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
+ val blockStreams = BlockStoreShuffleFetcher.fetchBlockStreams(
+ handle.shuffleId, startPartition, context)
+
+ // Wrap the streams for compression based on configuration
+ val wrappedStreams = blockStreams.map { case (blockId, inputStream) =>
+ blockManager.wrapForCompression(blockId, inputStream)
+ }
+
val ser = Serializer.getSerializer(dep.serializer)
- val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId,
startPartition, context, ser)
+ val serializerInstance = ser.newInstance()
+
+ // Create a key/value iterator for each stream
+ val recordIter = wrappedStreams.flatMap { wrappedStream =>
+
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
+ }
+
+ // Update the context task metrics for each record read.
+ val readMetrics =
context.taskMetrics.createShuffleReadMetricsForDependency()
+ val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
+ recordIter.map(record => {
+ readMetrics.incRecordsRead(1)
+ record
+ }),
+ context.taskMetrics().updateShuffleReadMetrics())
+
+ // An interruptible iterator must be used here in order to support
task cancellation
+ val interruptibleIter = new InterruptibleIterator[(Any, Any)](context,
metricIter)
val aggregatedIter: Iterator[Product2[K, C]] = if
(dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
- new InterruptibleIterator(context,
dep.aggregator.get.combineCombinersByKey(iter, context))
+ // We are reading values that are already combined
+ val combinedKeyValuesIterator =
interruptibleIter.asInstanceOf[Iterator[(K, C)]]
+
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
- new InterruptibleIterator(context,
dep.aggregator.get.combineValuesByKey(iter, context))
+ // We don't know the value type, but also don't care -- the
dependency *should*
+ // have made sure its compatible w/ this aggregator, which will
convert the value
+ // type to the combined type C
+ val keyValuesIterator =
interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+ dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator
specified!")
// Convert the Product2s to pairs since this is what downstream RDDs
currently expect
- iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1,
pair._2))
+ interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]].map(pair =>
(pair._1, pair._2))
--- End diff --
I understand the relationship between `Product2` and `Tuple2`. My point
was that signature of `read()` is **wrong** given how we actually use it.
if the interface is `def read(): Iterator[Product2[K, C]]`, that suggests
that implementors can return _any_ `Product2[K, C]`. That is, in addition to
returning a `(K, C)`, they should also be allowed to return a `case class
MutableTuple2[K,C](var key: K, var value: C)`. OTOH, callers of the method
should _not_ make any assumptions about the specific implementation of
`Product2` -- they must be ready to accept any implementation.
But, that's not what we're actually doing. The caller (`ShuffledRDD`) is
[assuming a specific subtype of `Product2` -- `Tuple2` to be
exact](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L91).
Instead of having the type system check this for us, we're stuck manually
keeping track of this, leading to extra casts & comments like the one above.
Again, this specific point has nothing to do with your PR, I just wanted to
bring it up while we're all looking at this. @JoshRosen @kayousterhout any
thoughts? If you agree with my understanding here, I'll open a separate issue
to change the types of `ShuffleReader.read` and `ExternalSorter.iterator`
appropriately and eliminate some casts.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]