Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/6990#discussion_r33747816
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
---
@@ -833,8 +833,10 @@ private[spark] class BlockManager(
logDebug("Put block %s locally took %s".format(blockId,
Utils.getUsedTimeMs(startTimeMs)))
// Either we're storing bytes and we asynchronously started
replication, or we're storing
- // values and need to serialize and replicate them now:
- if (putLevel.replication > 1) {
+ // values and need to serialize and replicate them now.
+ // Should not replicate the block if its StorageLevel is
StorageLevel.NONE or
+ // putting it to local is failed.
+ if (!putBlockInfo.isFailed && putLevel.replication > 1) {
--- End diff --
Case 2: Getting the block status from the remote peer is definitely not the
goal of a block manager and is not the intended semantics of PutResult
Case 1: I think I get your point. For the receiver point of view, local
store has failed, why even replicated. However, from the point of the block
manager which manages stuff for both receivers as well as RDD partitions,
this is definitely tricky to change the semantics right now. For RDD
partitions, this change would be a regression as the replica used to still
help speed up stuff.
Let me first confirm. Is the problem that you are trying to solve that
there are received blocks that gets inserted but never cleared?
On Wed, Jul 1, 2015 at 9:41 PM, Dibyendu Bhattacharya <
[email protected]> wrote:
> In core/src/main/scala/org/apache/spark/storage/BlockManager.scala
> <https://github.com/apache/spark/pull/6990#discussion_r33746918>:
>
> > @@ -833,8 +833,10 @@ private[spark] class BlockManager(
> > logDebug("Put block %s locally took %s".format(blockId,
Utils.getUsedTimeMs(startTimeMs)))
> >
> > // Either we're storing bytes and we asynchronously started
replication, or we're storing
> > - // values and need to serialize and replicate them now:
> > - if (putLevel.replication > 1) {
> > + // values and need to serialize and replicate them now.
> > + // Should not replicate the block if its StorageLevel is
StorageLevel.NONE or
> > + // putting it to local is failed.
> > + if (!putBlockInfo.isFailed && putLevel.replication > 1) {
>
> Hi @tdas <https://github.com/tdas> it was indeed race condition. I did
> not see your reply before posting mine :)
>
> I understand your point completely that options 3 is better , and it is
> also true that MEMORY_AND_DISK cases wont be having this block dropping
> issue . But if someone use MEMORY_ONLY_2 settings , then there will be a
> possible problem.
>
> Nevertheless , my point here was little different . There are two possible
> cases being discussed here.. .
>
> Case 1 : Even in present state , for MEMORY_ONLY_2 Storagelevel, if Block
> failed to stored locally , but replicated to remote peer , the
> ReceivedBlockHandler wont be getting the block id and will throw Exception
> . Now it is upto the Receiver implementation to ignore this exception or
> retry depending on how resilient the Receiver is towards data loss. My
> point is , if Receiver anyway not getting the Block Id if BlockManager
> failed to store locally, why even then try to Replicate ? It unnecessary
> consume additional memory in remote peer. And I guess it can happen to any
> client which calls BlockManager doPut to store the block and get
> updatedBlocks object which is Seq[(BlockId, BlockStatus)] to check if
block
> id exists . As this updatedBlocks does not contains the details if
> replication is successful or not , client wont find the blockId if store
to
> local is failed.
>
> Case 2 : What @squito <https://github.com/squito> is telling is also
> correct. Even if block failed to store locally but replicated to remote
> peer, why cant ReceivedBlockHandler get the block id from replicated node
> before throwing Exception. But that need change in ReceivedBlockHanlder to
> get block details from remote peers. But in this case the desired
> replication level will be reduced to 1 instead 2 .
>
> If we try option 3 , even in that case , I guess ReceivedBlockHandler
> needs changes to get the block id from remote peer.
>
> â
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/6990/files#r33746918>.
>
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]