mridulm commented on code in PR #44512:
URL: https://github.com/apache/spark/pull/44512#discussion_r1460207780
##########
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala:
##########
@@ -111,31 +111,50 @@ private[spark] class BlockStoreShuffleReader[K, C](
// An interruptible iterator must be used here in order to support task
cancellation
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context,
metricIter)
- val aggregatedIter: Iterator[Product2[K, C]] = if
(dep.aggregator.isDefined) {
+ // Sort the output if there is a sort ordering defined.
+ var aggregated = false
+ // The type of the value cannot be determined here, maybe the type of value
+ // or the type of combined value.
+ val sortedIter: Iterator[Product2[K, Nothing]] = dep.keyOrdering match {
+ case Some(keyOrd: Ordering[K]) =>
+ // Create an ExternalSorter to sort the data.
+ val sorter: ExternalSorter[K, _, C] = if (dep.aggregator.isDefined) {
+ aggregated = true
+ if (dep.mapSideCombine) {
+ new ExternalSorter[K, C, C](context,
+ Option(new Aggregator[K, C, C](identity,
+ dep.aggregator.get.mergeCombiners,
+ dep.aggregator.get.mergeCombiners)),
+ ordering = Some(keyOrd), serializer = dep.serializer)
+ } else {
+ new ExternalSorter[K, Nothing, C](context,
+ dep.aggregator.asInstanceOf[Option[Aggregator[K, Nothing, C]]],
+ ordering = Some(keyOrd), serializer = dep.serializer)
+ }
+ } else {
+ new ExternalSorter[K, C, C](context, ordering = Some(keyOrd),
+ serializer = dep.serializer)
+ }
+
sorter.insertAllAndUpdateMetrics(interruptibleIter.asInstanceOf[Iterator[(K,
Nothing)]]).
+ asInstanceOf[Iterator[(K, Nothing)]]
+ case None =>
+ interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+ }
+
+ val resultIter: Iterator[Product2[K, C]] = if (!aggregated &&
dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// We are reading values that are already combined
- val combinedKeyValuesIterator =
interruptibleIter.asInstanceOf[Iterator[(K, C)]]
+ val combinedKeyValuesIterator = sortedIter.asInstanceOf[Iterator[(K,
C)]]
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator,
context)
} else {
// We don't know the value type, but also don't care -- the dependency
*should*
// have made sure its compatible w/ this aggregator, which will
convert the value
// type to the combined type C
- val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K,
Nothing)]]
+ val keyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
- interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
- }
Review Comment:
We can drop the `aggregated` flag.
In other words, we can make this simply:
```
val resultIter = {
if (dep.keyOrdering.isDefined) {
// whatever is in dep.keyOrdering == match "Some(keyOrd: Ordering[K])"
clause.
} else if (dep.aggregator.isDefined) {
// whatever is in "if (!aggregated && dep.aggregator.isDefined) {"
} else {
interruptibleIter
}
}
```
The proposed code is effectively this - but spread across two blocks tied
through `aggregated` flag.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]