Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2366#discussion_r17825538
  
    --- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
         assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
         assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
       }
    +
    +  test("get peers with store addition and removal") {
    +    val numStores = 4
    +    val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, 
s"store$i") }
    +    val storeIds = stores.map { _.blockManagerId }.toSet
    +    assert(master.getPeers(stores(0).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(0).blockManagerId })
    +    assert(master.getPeers(stores(1).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(1).blockManagerId })
    +    assert(master.getPeers(stores(2).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(2).blockManagerId })
    +
    +    // Add driver store and test whether it is filtered out
    +    val driverStore = makeBlockManager(1000, "<driver>")
    +    assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
    +    assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
    +    assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
    +
    +    // Add a new store and test whether get peers returns it
    +    val newStore = makeBlockManager(1000, s"store$numStores")
    +    assert(master.getPeers(stores(0).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(0).blockManagerId } + 
newStore.blockManagerId)
    +    assert(master.getPeers(stores(1).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(1).blockManagerId } + 
newStore.blockManagerId)
    +    assert(master.getPeers(stores(2).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(2).blockManagerId } + 
newStore.blockManagerId)
    +    assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
    +
    +    // Remove a store and test whether get peers returns it
    +    val storeIdToRemove = stores(0).blockManagerId
    +    master.removeExecutor(storeIdToRemove.executorId)
    +    
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
    +    
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
    +    
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
    +
    +    // Test whether asking for peers of a unregistered block manager id 
returns empty list
    +    assert(master.getPeers(stores(0).blockManagerId).isEmpty)
    +    assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
    +  }
    +
    +  test("block replication - 2x") {
    +    testReplication(2,
    +      Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, 
MEMORY_AND_DISK_SER_2)
    +    )
    +  }
    +
    +  test("block replication - 3x") {
    +    // Generate storage levels with 3x replication
    +    val storageLevels = {
    +      Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, 
MEMORY_AND_DISK_SER).map {
    +        level => StorageLevel(
    +          level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 3)
    +      }
    +    }
    +    testReplication(3, storageLevels)
    +  }
    +
    +  test("block replication - mixed between 1x to 5x") {
    +    // Generate storage levels with varying replication
    +    val storageLevels = Seq(
    +      MEMORY_ONLY,
    +      MEMORY_ONLY_SER_2,
    +      StorageLevel(true, false, false, false, 3),
    +      StorageLevel(true, true, false, true, 4),
    +      StorageLevel(true, true, false, false, 5),
    +      StorageLevel(true, true, false, true, 4),
    +      StorageLevel(true, false, false, false, 3),
    +      MEMORY_ONLY_SER_2,
    +      MEMORY_ONLY
    +    )
    +    testReplication(5, storageLevels)
    +  }
    +
    +  test("block replication with addition and deletion of executors") {
    +    val blockSize = 1000
    +    val storeSize = 10000
    +    val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, 
s"store$i") }
    +
    +    def testPut(blockId: String, storageLevel: StorageLevel, 
expectedNumLocations: Int) {
    +      try {
    +        initialStores(0).putSingle(blockId, new Array[Byte](blockSize), 
storageLevel)
    +        assert(master.getLocations(blockId).size === expectedNumLocations)
    +      } finally {
    +        master.removeBlock(blockId)
    +      }
    --- End diff --
    
    This is confusing. The reason there is a `try...finally` is that this 
function `testPut` can be tried over and over again, inside a `eventually` 
block. So if the assertion fails, the system does not move on to the next unit 
test. The system needs to be reset, by removing the block that was added. 
    
    I guess I will make that more clear in the comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to