peter-toth commented on PR #54330:
URL: https://github.com/apache/spark/pull/54330#issuecomment-3930668246

   > The current code is definitely more Spark-native, reusing coalesceRDD, but 
my doubt is the threadlocal and chance for memory leak like fixed by @viirya : 
#51503 But ill defer to others, if people like this approach more.
   
   As far as I see you assume that the child is a `DataSourceRDD`, but the main 
point of this change is to move the grouping logic to the new operator 
(`GroupPartitionsExec`) so as to be able to insert it into those plans as well 
where there is no `BatchScanExec` / `DataSourceRDD`, e.g. cached or 
checkpointed plans.
   
   Also, even if there is a `BatchScanExec` (`DataSourceRDD`) in the 
plan,`GroupPartitionsExec` is inserted right below the join / aggregate where 
the grouping is needed (like an exchange), so there can be other nodes / RDDs 
between the `GroupPartitionsExec` and the data source. So we can't assume that.
   
   > > DataSourceRDD is now back to its pre partition grouping form. IMO we 
will need to backport the ThreadLocal[ReaderState] fix to previous Spark 
versions too so as to fix the case when there is a coalece after the scan.
   > 
   > btw, didn't get this, are you saying there is some leak in current 
DataSourceRDD that need ThreadLocal to fix? Should it be fixed spearately?
   
   Not necessarily a leak, but there are some issues with custom metrics 
reporting and when the readers gets closed. Consider the following plan 
(without this PR):
   ```
   ...
     CoalesceExec (1)
       BatchScanExec (returns 2 partitions)
   ```
   We have only 1 task in the stage due to coalesce(1) and that task calls the 
`DataSourceRDD.compute()` for both input partitions. It doesn't matter if those 
partitions are actually grouped or not. Both invocations create 1-1 reader and 
install 1-1 listener to close the readers and report customer metrics of the 
reader. But the listeners run only at the end of the task so the first reader 
is kept open for too long. What's worse, the 2 metrics reported conflict and 
only one will be kept.
   So I think yes, we need to fix this issue on other branches as well. I don't 
think we can/should backport this refactor to older versions, but we can 
extract the `ThreadLocal[ReaderState]` logic and apply it on other branches. 
Technically we could fix it separately, but as this PR hugely simplifies the 
affected `DataSourceRDD` it is easier to do it together with this refactor on 
the `master` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to