Github user liyezhang556520 commented on the pull request:
https://github.com/apache/spark/pull/2134#issuecomment-55356380
Hi @andrewor14 , you are really right about this, which is also my concern,
and I tried to make the risk to the least. Allow me to tell a story here:
One reason I removed configuration of **spark.storage.unrollFraction** is
that the **unrollFraction** is set to be a fixed value, however, in some
workload, assume there are many iteration of the application, and each
iteration has blocks to be cached, for each iteration, the required cache
memory for new blocks may various, and this has some side-effect, for some
iteration too many old blocks dropped leading to time wasting, and for some
iteration not enough old blocks dropped which will lead to the new blocks
dropped to disk or else while there are still many old blocks can be dropped
for new blocks for caching. And also, when there is not so many old blocks that
can be dropped (the memory of old blocks can be dropped is less than `maxMemory
* unrollFraction`), the `ensureFreeSpace` will always return false. so it's
hard for user to decide the value of `unrollFraction`. The other reason is for
easy implementation of dropping old blocks in parallel.
For OOM problem, it's really hard to avoid, since there are two places have
the risk in this patch and one of the two also exists in the original
implementation.
1. when we process blocks in `unrollSafely`, we will go through the
`iterator` and to see if the new block partitions can be put into memory, and
the checkperiod is `memoryCheckPeriod`, default value 16. Since we have no idea
what is the memory value is required for each iteration, and this process is in
parallel with many threads, the memory has occupied by the new block partitions
for the first round check might be already very huge. This might cause OOM when
the memory is already around the edge of it's capacity. This situations exists
in both this patch and origin implementation.
2. The second place is where you pointed out. Yes, in this patch, We lazy
drop the old blocks when new blocks are to unroll in `unrollSafely`. In my
implementation, for each check period, if old blocks need to drop, then only
the least number of old blocks will be marked to be dropped for the current
thread, just satisfy the required value of the new block partition. And then
dropped those marked old blocks to disk, and continue going through the
iteration for next checkpoint. Since only the least number of blocks will be
dropped, which will make the difference of the tobedropped memory and
tobeunroll memory to the least. And only the difference value will have effects
the `freeMemoryForUnroll`, which will have effect to other threads unrolling
process.
There are two phases need to drop blocks in the whole procedure, one is
`unrollSafely`, and the other is `tryToPut`, there will no OOM risk for
`tryToPut` since all data when calling `tryToPut` has been already in memory.
Fortunately there is `spark.storage.safetyFraction` to lower the risk
deeper, but the OOM risk will still exists I think.
Another way is just drop the new blocks to disk when there is not enough
free memory, which will not dropping old blocks at all, and in this way can
also gain a lot performance speedup compared with dropping old block in serial.
And performance is very close to dropping old blocks in parallel in our test.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]