peter-toth commented on PR #54330:
URL: https://github.com/apache/spark/pull/54330#issuecomment-3930668246
> The current code is definitely more Spark-native, reusing coalesceRDD, but
my doubt is the threadlocal and chance for memory leak like fixed by @viirya :
#51503 But ill defer to others, if people like this approach more.
As far as I see you assume that the child is a `DataSourceRDD`, but the main
point of this change is to move the grouping logic to the new operator
(`GroupPartitionsExec`) so as to be able to insert it into those plans as well
where there is no `BatchScanExec` / `DataSourceRDD`, e.g. cached or
checkpointed plans.
Also, even if there is a `BatchScanExec` (`DataSourceRDD`) in the
plan,`GroupPartitionsExec` is inserted right below the join / aggregate where
the grouping is needed (like an exchange), so there can be other nodes / RDDs
between the `GroupPartitionsExec` and the data source. So we can't assume that.
> > DataSourceRDD is now back to its pre partition grouping form. IMO we
will need to backport the ThreadLocal[ReaderState] fix to previous Spark
versions too so as to fix the case when there is a coalece after the scan.
>
> btw, didn't get this, are you saying there is some leak in current
DataSourceRDD that need ThreadLocal to fix? Should it be fixed spearately?
Not necessarily a leak, but there are some issues with custom metrics
reporting and when the readers gets closed. Consider the following plan
(without this PR):
```
...
CoalesceExec (1)
BatchScanExec (returns 2 partitions)
```
We have only 1 task in the stage due to coalesce(1) and that task calls the
`DataSourceRDD.compute()` for both input partitions. It doesn't matter if those
partitions are actually grouped or not. Both invocations create 1-1 reader and
install 1-1 listener to close the readers and report customer metrics of the
reader. But the listeners run only at the end of the task so the first reader
is kept open for too long. What's worse, the 2 metrics reported conflict and
only one will be kept.
So I think yes, we need to fix this issue on other branches as well. I don't
think we can/should backport this refactor to older versions, but we can
extract the `ThreadLocal[ReaderState]` logic and apply it on other branches.
Technically we could fix it separately, but as this PR hugely simplifies the
affected `DataSourceRDD` it is easier to do it together with this refactor on
the `master` branch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]