LuciferYang commented on code in PR #49350:
URL: https://github.com/apache/spark/pull/49350#discussion_r1903017283
##########
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala:
##########
@@ -78,7 +78,13 @@ class HostLocalShuffleReadingSuite extends SparkFunSuite
with Matchers with Loca
test(s"host local shuffle reading with external shuffle service
$essStatus") {
conf.set(SHUFFLE_SERVICE_ENABLED, isESSEnabled)
.set(STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE, 5)
- sc = new SparkContext("local-cluster[2,1,1024]",
"test-host-local-shuffle-reading", conf)
+ val master = if (isESSEnabled) {
+ conf.set(EXECUTOR_CORES, 1)
+ "local-cluster[1,2,2048]"
+ } else {
+ "local-cluster[2,1,1024]"
+ }
+ sc = new SparkContext(master, "test-host-local-shuffle-reading", conf)
Review Comment:
Interesting ~ although the configuration methods are different, the
executors launched are the same as seen in the screenshot above. Why did the
original configuration result in test failure?
##########
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala:
##########
@@ -78,7 +78,13 @@ class HostLocalShuffleReadingSuite extends SparkFunSuite
with Matchers with Loca
test(s"host local shuffle reading with external shuffle service
$essStatus") {
conf.set(SHUFFLE_SERVICE_ENABLED, isESSEnabled)
.set(STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE, 5)
- sc = new SparkContext("local-cluster[2,1,1024]",
"test-host-local-shuffle-reading", conf)
+ val master = if (isESSEnabled) {
+ conf.set(EXECUTOR_CORES, 1)
+ "local-cluster[1,2,2048]"
+ } else {
+ "local-cluster[2,1,1024]"
Review Comment:


##########
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala:
##########
@@ -78,7 +78,13 @@ class HostLocalShuffleReadingSuite extends SparkFunSuite
with Matchers with Loca
test(s"host local shuffle reading with external shuffle service
$essStatus") {
conf.set(SHUFFLE_SERVICE_ENABLED, isESSEnabled)
.set(STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE, 5)
- sc = new SparkContext("local-cluster[2,1,1024]",
"test-host-local-shuffle-reading", conf)
+ val master = if (isESSEnabled) {
+ conf.set(EXECUTOR_CORES, 1)
+ "local-cluster[1,2,2048]"
Review Comment:


##########
core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala:
##########
@@ -127,7 +145,7 @@ class ExternalShuffleService(sparkConf: SparkConf,
securityManager: SecurityMana
}
/** Clean up all shuffle files associated with an application that has
exited. */
- def applicationRemoved(appId: String): Unit = {
+ def applicationRemoved(appId: String, cleanupLocalDirs: Boolean = true):
Unit = {
Review Comment:
Why is the parameter `cleanupLocalDirs` added here but not used?
##########
common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java:
##########
@@ -151,7 +153,10 @@ protected void initChannel(SocketChannel ch) {
InetSocketAddress localAddress = (InetSocketAddress)
channelFuture.channel().localAddress();
port = localAddress.getPort();
- logger.debug("Shuffle server started on {} with port {}",
localAddress.getHostString(), port);
+ logger.info("{} server started on {} with port {}",
Review Comment:
It seems that this change is more like a bug fix related to logging.
##########
core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala:
##########
@@ -171,6 +172,12 @@ private[deploy] class ExecutorRunner(
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
+ if (shuffleServicePort.isDefined && LocalSparkCluster.get.isDefined) {
Review Comment:
Do we need to add a check for `Utils.isTesting`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]