JkSelf commented on pull request #29268:
URL: https://github.com/apache/spark/pull/29268#issuecomment-671171703
@cloud-fan If we collect the row count info in `ShuffleWriteProcessor`, we
may need to traverse the Partition records additionally as following code:
```
private def collectRowCountInfo(
mapId: Long,
records: Iterator[Product2[Any, Any]],
dep: ShuffleDependency[_, _, _],
writeMetrics: ShuffleWriteMetricsReporter): Unit = {
val numPartitions = dep.partitioner.numPartitions
val rowCountInfo: Array[Long] = new Array[Long](numPartitions)
while (records.hasNext) {
val record: Product2[Any, Any] = records.next
val key: Any = record._1
val partitionId: Int = dep.partitioner.getPartition(key)
rowCountInfo(partitionId) += 1
}
writeMetrics.addMapRowCountInfo(new RowCountInfo(mapId, rowCountInfo))
}
```
But It may be bring performance degradation for the iterator of records
additionally?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]