Github user viirya commented on the pull request:
https://github.com/apache/spark/pull/2217#issuecomment-53983398
Thanks for your comments. For the possibility to cause an exception on an
executor, it happens when `synchronized` is not there. As `setValue` is wrapped
in `TorrentBroadcast.synchronized`, I think that the exception caused by the
conflict between `doDestroy ` and `putSingle ` is solved.
I would like to know if the immutability of broadcasted variables is
necessary. The re-broadcasting would not be happened because new value is
immediately filled in to replace old value in a broadcasted variable so the
broadcasted variable will not request corresponding block from other remote
executors/driver's `BlockManager`.
In fact, this PR is intended to reduce re-broadcasting when running some
algorithms. For some algorithms, we may calculate a parameter in the first step
with `mapPartitions` on each partition of data. Then in later steps, the
parameter can be reuse without re-calculation if we can keep it. If we collect
the calculated values back from all tasks and broadcast again in later steps,
there is redundant communication cost involved.
So I would like to let broadcasted variables mutable on each executor. Thus
in later steps of such algorithms, we can reuse those calculated values.
Suggestions are welcome. Maybe we can have better solution on that.
I just found that calling `doDestroy` would remove remote blocks and that
is not what I want. So I added another commit to remove blocks locally instead.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]