Github user xuanyuanking commented on a diff in the pull request:
https://github.com/apache/spark/pull/23207#discussion_r239698273
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
---
@@ -333,8 +343,19 @@ object ShuffleExchangeExec {
new ShuffleDependency[Int, InternalRow, InternalRow](
rddWithPartitionIds,
new PartitionIdPassthrough(part.numPartitions),
- serializer)
+ serializer,
+ shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))
dependency
}
+
+ /**
+ * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the
default metrics reporter
+ * with [[SQLShuffleWriteMetricsReporter]] as new reporter for
[[ShuffleWriteProcessor]].
+ */
+ def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]):
ShuffleWriteProcessor = {
+ (reporter: ShuffleWriteMetricsReporter) => {
--- End diff --
Yes it can't work with Scala 2.11, should write in more readable, done in
6378a3d.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]