otterc commented on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805061280
> hmm..I try to understand how those 2 scenarios cause the merged block
corrupted.
>
> 1. Do you mean called `StreamCallback.onFailure()` for 2 times cause the
block corrupted?Seems like the thing `onFailure` does is only to
`setCurrentMapIndex(-1)` and `setEncounteredFailure(true)`. And they don't
touch files, e.g., reset position or truncate.
> 2. I can see how the duplicate stream may interfere with an active stream.
e.g., the active stream may see `getCurrentMapIndex` < 0 and
`isEncounteredFailure=true` while writing normally itself. But it seems like
the active stream is able to heal itself with the current framework.
>
> I properly missed some details. Could you elaborate more about how
corruption happens? Thanks.
In both the scenarios, the `currentMapId` of the shuffle partition is
modified to -1 which can interfere with an active stream (stream that is
writing). By interfering, I mean it gives a chance to another stream which is
waiting to merge to same shuffle partition to start writing without the active
stream completing successfully or with failure.
Providing examples for both of these:
1. When on `onFailure` is called twice
- Say stream1 merging `shufflePush_0_1_2` wrote some data and has
`isWriting=true`. Now it failed, so it sets `currentMapId` of partition_0_2 to
`-1`.
- Another stream2 which wants to merge `shufflePush_0_2_2` can now start
merging its bufs to partition_0_2 and it sets `currentMapId` of partition_0_2
to `2`.
- Another stream3 which wants to merge `shufflePush_0_3_2` will defer its
buffers because `stream2` is the active one right now (`currentMapId` is 2).
- stream2 has only merged few bufs, but then `stream1.onFailure()` is
invoked again and that will change the `currentMapId` of partiton_0_2 to `-1`.
This becomes a problem because `stream2` hasn't completed successfully (or with
failure) and now `stream3` is `allowedToWrite`. If `stream3` starts writing
buffers when `stream2` has not appended all its buffers, then the data of
`shufflePush_0_2_2` will be corrupted.
2. Duplicate stream.
- Say stream1 merging `shufflePush_0_1_2` wrote some data and has
`isWriting=true`. It completed successfully and then sets `currentMapId` of
partition_0_2 to `-1`.
- Now `stream1duplicate` which is also trying to merge `shufflePush_0_1_2`
will be `allowedToWrite` because the `currentMapId` of partition_0_2 is `-1`
and it sets `isWriting=true`. However, we identify that it is a duplication
stream and just return without modifying `currentMapId`.
- stream2 which tries to merge `shufflePush_0_2_2` will be `allowedToWrite`
because `currentMapId=-1`. It sets `currentMapId=2` and start writing.
- If `stream2Duplicate` encounters a failure now, it has `isWriting` on and
so can reset `currentMapId` of partition_0_2. This again gives a chance to
another stream say stream3 to `allowedToWrite` without stream2 to complete.
I have added UTs for both these cases as well with similar examples.
@Ngone51
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]