attilapiros commented on code in PR #45836: URL: https://github.com/apache/spark/pull/45836#discussion_r1702385601
########## core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala: ########## @@ -296,6 +306,26 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite } } + test("Test block location after replication with SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") { + val shuffleServicePort = conf.get(SHUFFLE_SERVICE_PORT) + val store1 = makeBlockManager(10000, "host-1") + val store2 = makeBlockManager(10000, "host-2") + assert(master.getPeers(store1.blockManagerId).toSet === Set(store2.blockManagerId)) + + val blockId = RDDBlockId(1, 2) + val message = new Array[Byte](1000) + + // if SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled, then shuffle port should be present. + store1.putSingle(blockId, message, StorageLevel.DISK_ONLY) + assert(master.getLocations(blockId).contains( + BlockManagerId("host-1", "localhost", shuffleServicePort, None))) + + // after block is removed, shuffle port should be removed. Review Comment: ```suggestion // After block is removed the external shuffle service should be removed too from the locations ``` ########## core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala: ########## @@ -296,6 +306,26 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite } } + test("Test block location after replication with SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") { + val shuffleServicePort = conf.get(SHUFFLE_SERVICE_PORT) + val store1 = makeBlockManager(10000, "host-1") + val store2 = makeBlockManager(10000, "host-2") + assert(master.getPeers(store1.blockManagerId).toSet === Set(store2.blockManagerId)) Review Comment: We do not need this. ########## core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala: ########## @@ -296,6 +306,26 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite } } + test("Test block location after replication with SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") { + val shuffleServicePort = conf.get(SHUFFLE_SERVICE_PORT) + val store1 = makeBlockManager(10000, "host-1") + val store2 = makeBlockManager(10000, "host-2") + assert(master.getPeers(store1.blockManagerId).toSet === Set(store2.blockManagerId)) + + val blockId = RDDBlockId(1, 2) + val message = new Array[Byte](1000) + + // if SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled, then shuffle port should be present. Review Comment: ```suggestion // As SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled the external shuffle service should be listed ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org