GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/3191

    [SPARK-3495][SPARK-3496] Backporting fix made in master to branch 1.1

    The original PR was #2366 
    
    This backport was non-trivial because Spark 1.1 uses ConnectionManager 
instead of NioBlockTransferService, which required slight modification to unit 
tests. Other than that the code is exactly same as in the original PR. Please 
refer to discussion in the original PR if you have any thoughts.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark replication-fix-branch-1.1-backport

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/3191.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3191
    
----
commit de4ff73f7aa2379d7c4e40c558ad0e366e9ef02d
Author: Tathagata Das <[email protected]>
Date:   2014-10-02T20:49:47Z

    [SPARK-3495] Block replication fails continuously when the replication 
target node is dead AND [SPARK-3496] Block replication by mistake chooses 
driver as target
    
    If a block manager (say, A) wants to replicate a block and the node chosen 
for replication (say, B) is dead, then the attempt to send the block to B 
fails. However, this continues to fail indefinitely. Even if the driver learns 
about the demise of the B, A continues to try replicating to B and failing 
miserably.
    
    The reason behind this bug is that A initially fetches a list of peers from 
the driver (when B was active), but never updates it after B is dead. This 
affects Spark Streaming as its receiver uses block replication.
    
    The solution in this patch adds the following.
    - Changed BlockManagerMaster to return all the peers of a block manager, 
rather than the requested number. It also filters out driver BlockManager.
    - Refactored BlockManager's replication code to handle peer caching 
correctly.
        + The peer for replication is randomly selected. This is different from 
past behavior where for a node A, a node B was deterministically chosen for the 
lifetime of the application.
        + If replication fails to one node, the peers are refetched.
        + The peer cached has a TTL of 1 second to enable discovery of new 
peers and using them for replication.
    - Refactored use of \<driver\> in BlockManager into a new method 
`BlockManagerId.isDriver`
    - Added replication unit tests (replication was not tested till now, duh!)
    
    This should not make a difference in performance of Spark workloads where 
replication is not used.
    
    @andrewor14 @JoshRosen
    
    Author: Tathagata Das <[email protected]>
    
    Closes #2366 from tdas/replication-fix and squashes the following commits:
    
    9690f57 [Tathagata Das] Moved replication tests to a new 
BlockManagerReplicationSuite.
    0661773 [Tathagata Das] Minor changes based on PR comments.
    a55a65c [Tathagata Das] Added a unit test to test replication behavior.
    012afa3 [Tathagata Das] Bug fix
    89f91a0 [Tathagata Das] Minor change.
    68e2c72 [Tathagata Das] Made replication peer selection logic more 
efficient.
    08afaa9 [Tathagata Das] Made peer selection for replication deterministic 
to block id
    3821ab9 [Tathagata Das] Fixes based on PR comments.
    08e5646 [Tathagata Das] More minor changes.
    d402506 [Tathagata Das] Fixed imports.
    4a20531 [Tathagata Das] Filtered driver block manager from peer list, and 
also consolidated the use of <driver> in BlockManager.
    7598f91 [Tathagata Das] Minor changes.
    03de02d [Tathagata Das] Change replication logic to correctly refetch peers 
from master on failure and on new worker addition.
    d081bf6 [Tathagata Das] Fixed bug in get peers and unit tests to test 
get-peers and replication under executor churn.
    9f0ac9f [Tathagata Das] Modified replication tests to fail on replication 
bug.
    af0c1da [Tathagata Das] Added replication unit tests to BlockManagerSuite
    
    Conflicts:
        core/src/main/scala/org/apache/spark/storage/BlockManager.scala
        core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
        core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to