Hisoka-X commented on code in PR #41958:
URL: https://github.com/apache/spark/pull/41958#discussion_r1263581572


##########
core/src/main/scala/org/apache/spark/Dependency.scala:
##########
@@ -76,7 +75,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends 
Dependency[T] {
  */
 @DeveloperApi
 class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
-    @transient private val _rdd: RDD[_ <: Product2[K, V]],
+    private val _rdd: RDD[_ <: Product2[K, V]],

Review Comment:
   We need to transfer the rdd in `ShuffleDependency` to the executor node to 
ensure that all metrics can be registered on the executor node. Taking 
`ObjectHashAggregateExec` as an example, this problem was not exposed before 
because in `ObjectHashAggregateExec`, we transmit lambda as task information, 
lambda will contain `ObjectHashAggregateExec` object information, and 
`ObjectHashAggregateExec` will contain a child `ObjectHashAggregateExec` (this 
is because of the partition operator, `ObjectHashAggregateExec` will be divided 
into two stages, and what we lose is the indicator information of the 
sub-ObjectHashAggregateExec). Sub-ObjectHashAggregateExec will follow the 
lambda function to transfer the corresponding metrics information to the 
executor node. Now because we have extracted the logic into the 
`EvaluatorFactory`, the lambda transfers only the `EvaluatorFactory` object, so 
we lose the metrics information of the child ObjectHashAggregateExec. This will 
cause some metrics to be
  lost on the executor side.
   
   The lambda data that will be transferred after `EvaluatorFactory`
   
![image](https://github.com/apache/spark/assets/32387433/e883c7de-d6ea-458e-8e68-f58c7c6d9711)
   
   The lambda data that will be transferred before `EvaluatorFactory`
   
![image](https://github.com/apache/spark/assets/32387433/6d333f45-0e46-4c0c-a978-2352cd099064)
   
   This is information what we need.
   
![image](https://github.com/apache/spark/assets/32387433/3735a487-3088-4867-a7ce-117eccb72398)
   
   The physical plan about `ObjectHashAggregate`
   ```log
   ObjectHashAggregate(keys=[a#3], functions=[collect_set(a#3, 0, 0)], 
output=[a#3, collect_set(a)#67])
   +- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [plan_id=62]
      +- ObjectHashAggregate(keys=[a#3], functions=[partial_collect_set(a#3, 0, 
0)], output=[a#3, buf#71])
         +- SerializeFromObject [knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3]
            +- Scan[obj#2]
   ```
   
   cc @cloud-fan @MaxGekk @viirya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to