Hisoka-X commented on code in PR #41958:
URL: https://github.com/apache/spark/pull/41958#discussion_r1263581572
##########
core/src/main/scala/org/apache/spark/Dependency.scala:
##########
@@ -76,7 +75,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends
Dependency[T] {
*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
- @transient private val _rdd: RDD[_ <: Product2[K, V]],
+ private val _rdd: RDD[_ <: Product2[K, V]],
Review Comment:
We need to transfer the rdd in `ShuffleDependency` to the executor node to
ensure that all metrics can be registered on the executor node. Taking
`ObjectHashAggregateExec` as an example, this problem was not exposed before
because in `ObjectHashAggregateExec`, we transmit lambda as task information,
lambda will contain `ObjectHashAggregateExec` object information, and
`ObjectHashAggregateExec` will contain a child `ObjectHashAggregateExec` (this
is because of the partition operator, `ObjectHashAggregateExec` will be divided
into two stages, and what we lose is the indicator information of the
sub-ObjectHashAggregateExec). Sub-ObjectHashAggregateExec will follow the
lambda function to transfer the corresponding metrics information to the
executor node. Now because we have extracted the logic into the
`EvaluatorFactory`, the lambda transfers only the `EvaluatorFactory` object, so
we lose the metrics information of the child ObjectHashAggregateExec. This will
cause some metrics to be
lost on the executor side.
The lambda data that will be transferred after `EvaluatorFactory`

The lambda data that will be transferred before `EvaluatorFactory`

This is information what we need.

The physical plan about `ObjectHashAggregate`
```log
ObjectHashAggregate(keys=[a#3], functions=[collect_set(a#3, 0, 0)],
output=[a#3, collect_set(a)#67])
+- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [plan_id=62]
+- ObjectHashAggregate(keys=[a#3], functions=[partial_collect_set(a#3, 0,
0)], output=[a#3, buf#71])
+- SerializeFromObject [knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3]
+- Scan[obj#2]
```
cc @cloud-fan @MaxGekk @viirya
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]