Github user ahmed-mahran commented on the issue:
https://github.com/apache/spark/pull/13866
@zsxwing, it is great that you have pointed to unallocated blocks, I've
missed those.
**What to clean?**
So, there are three kinds of stored blocks here: *unallocated stream
blocks*, *allocated stream blocks*, *other rdd blocks*. Here is how each is
cleared currently given that the `spark.streaming.unpersist` flag is set (not
quite sure though, need expert confirmation):
1. *RDD blocks:* after each batch, old RDDs are unpersisted, except for the
last batch, last remembered windows of blocks are left for the context cleaner
2. *Allocated stream blocks:* after each batch, blocks of old `BlockRDD`s
are removed, except for the last batch, last remembered windows of blocks are
left orphan and out of scope of the context cleaner
3. *Unallocated stream blocks:* before each batch, blocks get allocated
(hence *allocated stream blocks* cleaning procedure applies), except for the
last batch, unallocated blocks are left orphan and out of scope of the context
cleaner
**When to clean?**
Cleaning of unallocated blocks shouldn't be done while there is still time
for batches to be allocated. Cleaning of all blocks should be done after last
batch is done or times out. So, given the sequence that involved components are
stopped:
1. `ReceivedBlockTracker` stops receiving new blocks, cannot clear
unallocated blocks; there is still time for batches to be allocated
2. `ReceiverInputDStream`s are stopped after a timeout or no more
unallocated blocks, cannot clear allocated blocks; jobs could still be running
3. `JobGenerator` stops after last batch is done or times out, and is clear
to clean
, I think the only valid place to start cleaning is when the `JobGenerator`
is ready to stop.
**Suggestions**
For unallocated blocks, we can add a method to `ReceivedBlockTracker` to
`clearUnallocatedBlocks` and call it in `JobGenerator` as the last stop step.
For allocated blocks, either or both of the following:
- Call `graph.clearMetadata` as the last stop step of `JobGenerator`
- Override `unpersist` method of `BlockRDD` to remove blocks, so that
blocks of persistent RDDs are handled by the context cleaner, maybe also to
call `persist` upon `BlockRDD` creation to ensure that all `BlockRDD` instances
are handled by the context cleaner
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]