Github user massie commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6423#discussion_r32242149
  
    --- Diff: 
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala ---
    @@ -33,23 +34,55 @@ private[spark] class HashShuffleReader[K, C](
         "Hash shuffle currently only supports fetching one partition")
     
       private val dep = handle.dependency
    +  private val blockManager = SparkEnv.get.blockManager
     
       /** Read the combined key-values for this reduce task */
       override def read(): Iterator[Product2[K, C]] = {
    +    val blockStreams = BlockStoreShuffleFetcher.fetchBlockStreams(
    +      handle.shuffleId, startPartition, context)
    +
    +    // Wrap the streams for compression based on configuration
    +    val wrappedStreams = blockStreams.map { case (blockId, inputStream) =>
    +      blockManager.wrapForCompression(blockId, inputStream)
    +    }
    +
         val ser = Serializer.getSerializer(dep.serializer)
    -    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, 
startPartition, context, ser)
    +    val serializerInstance = ser.newInstance()
    +
    +    // Create a key/value iterator for each stream
    +    val recordIter = wrappedStreams.flatMap { wrappedStream =>
    +      
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
    +    }
    +
    +    // Update the context task metrics for each record read.
    +    val readMetrics = 
context.taskMetrics.createShuffleReadMetricsForDependency()
    +    val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
    +      recordIter.map(record => {
    +        readMetrics.incRecordsRead(1)
    +        record
    +      }),
    +      context.taskMetrics().updateShuffleReadMetrics())
    +
    +    // An interruptible iterator must be used here in order to support 
task cancellation
    +    val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, 
metricIter)
     
         val aggregatedIter: Iterator[Product2[K, C]] = if 
(dep.aggregator.isDefined) {
           if (dep.mapSideCombine) {
    -        new InterruptibleIterator(context, 
dep.aggregator.get.combineCombinersByKey(iter, context))
    +        // We are reading values that are already combined
    +        val combinedKeyValuesIterator = 
interruptibleIter.asInstanceOf[Iterator[(K, C)]]
    +        
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
           } else {
    -        new InterruptibleIterator(context, 
dep.aggregator.get.combineValuesByKey(iter, context))
    +        // We don't know the value type, but also don't care -- the 
dependency *should*
    +        // have made sure its compatible w/ this aggregator, which will 
convert the value
    +        // type to the combined type C
    +        val keyValuesIterator = 
interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
    +        dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
           }
         } else {
           require(!dep.mapSideCombine, "Map-side combine without Aggregator 
specified!")
     
           // Convert the Product2s to pairs since this is what downstream RDDs 
currently expect
    -      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, 
pair._2))
    +      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => 
(pair._1, pair._2))
    --- End diff --
    
    The Scala compiler treats `( ..., ...)` as an alias for Tuple*n*. Tuples 
extend Products, e.g. `Tuple2` extends `Product2` and adds `toString()` and 
`swap()` methods.
    
    The signature of read() is `def read(): Iterator[Product2[K, C]]` so 
whatever we create, be it `Tuple2`/`(...)` or `Product2`, will be returned as a 
`Product2`. I don't understand why the `map()` occur here as well since it 
converts a `Product2` to `Tuple2` which then is returned as `Product2`. I think 
we should drop the map.
    
    While the syntactic sugar for tuples looks nice, I think we should should 
keep the `Product2` notation through out since that is what `read()` is 
returning (for now).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to