Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2366#discussion_r17825562
  
    --- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
         assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
         assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
       }
    +
    +  test("get peers with store addition and removal") {
    +    val numStores = 4
    +    val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, 
s"store$i") }
    +    val storeIds = stores.map { _.blockManagerId }.toSet
    +    assert(master.getPeers(stores(0).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(0).blockManagerId })
    +    assert(master.getPeers(stores(1).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(1).blockManagerId })
    +    assert(master.getPeers(stores(2).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(2).blockManagerId })
    +
    +    // Add driver store and test whether it is filtered out
    +    val driverStore = makeBlockManager(1000, "<driver>")
    +    assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
    +    assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
    +    assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
    +
    +    // Add a new store and test whether get peers returns it
    +    val newStore = makeBlockManager(1000, s"store$numStores")
    +    assert(master.getPeers(stores(0).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(0).blockManagerId } + 
newStore.blockManagerId)
    +    assert(master.getPeers(stores(1).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(1).blockManagerId } + 
newStore.blockManagerId)
    +    assert(master.getPeers(stores(2).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(2).blockManagerId } + 
newStore.blockManagerId)
    +    assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
    +
    +    // Remove a store and test whether get peers returns it
    +    val storeIdToRemove = stores(0).blockManagerId
    +    master.removeExecutor(storeIdToRemove.executorId)
    +    
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
    +    
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
    +    
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
    +
    +    // Test whether asking for peers of a unregistered block manager id 
returns empty list
    +    assert(master.getPeers(stores(0).blockManagerId).isEmpty)
    +    assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
    +  }
    +
    +  test("block replication - 2x") {
    +    testReplication(2,
    +      Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, 
MEMORY_AND_DISK_SER_2)
    +    )
    +  }
    +
    +  test("block replication - 3x") {
    +    // Generate storage levels with 3x replication
    +    val storageLevels = {
    +      Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, 
MEMORY_AND_DISK_SER).map {
    +        level => StorageLevel(
    +          level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 3)
    +      }
    +    }
    +    testReplication(3, storageLevels)
    +  }
    +
    +  test("block replication - mixed between 1x to 5x") {
    +    // Generate storage levels with varying replication
    +    val storageLevels = Seq(
    +      MEMORY_ONLY,
    +      MEMORY_ONLY_SER_2,
    +      StorageLevel(true, false, false, false, 3),
    +      StorageLevel(true, true, false, true, 4),
    +      StorageLevel(true, true, false, false, 5),
    +      StorageLevel(true, true, false, true, 4),
    +      StorageLevel(true, false, false, false, 3),
    +      MEMORY_ONLY_SER_2,
    +      MEMORY_ONLY
    +    )
    +    testReplication(5, storageLevels)
    +  }
    +
    +  test("block replication with addition and deletion of executors") {
    +    val blockSize = 1000
    +    val storeSize = 10000
    +    val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, 
s"store$i") }
    +
    +    def testPut(blockId: String, storageLevel: StorageLevel, 
expectedNumLocations: Int) {
    +      try {
    +        initialStores(0).putSingle(blockId, new Array[Byte](blockSize), 
storageLevel)
    +        assert(master.getLocations(blockId).size === expectedNumLocations)
    +      } finally {
    +        master.removeBlock(blockId)
    +      }
    +    }
    +
    +    // 2x replication should work, 3x replication should only replicate 2x
    +    testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2)
    +    testPut("a2", StorageLevel(true, true, false, true, 3), 2)
    +
    +    // Add another store, 3x replication should work now, 4x replication 
should only replicate 3x
    +    val newStore1 = makeBlockManager(storeSize, s"newstore1")
    +    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
    +      testPut("a3", StorageLevel(true, true, false, true, 3), 3)
    +    }
    +    testPut("a4",StorageLevel(true, true, false, true, 4), 3)
    +
    +    // Add another store, 4x replication should work now
    +    val newStore2 = makeBlockManager(storeSize, s"newstore2")
    +    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
    +      testPut("a5", StorageLevel(true, true, false, true, 4), 4)
    +    }
    +
    +    // Remove all but the 1st store, 2x replication should fail
    +    (initialStores.slice(1, initialStores.size) ++ Seq(newStore1, 
newStore2)).foreach {
    --- End diff --
    
    Done. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to