This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 84620f2b877 [SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions 84620f2b877 is described below commit 84620f2b877b9ea52b95343ca46d069a906e28a9 Author: Kent Yao <y...@apache.org> AuthorDate: Fri Jun 30 18:33:16 2023 +0800 [SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions ### What changes were proposed in this pull request? This PR makes zero when io.connectionTimeout/connectionCreationTimeout is negative. Zero here means - connectionCreationTimeout = 0,an unlimited CONNNETION_TIMEOUT for connection establishment - connectionTimeout=0, `IdleStateHandler` for triggering `IdleStateEvent` is disabled. ### Why are the changes needed? 1. This PR fixes a bug when connectionCreationTimeout is 0, which means unlimited to netty, but ChannelFuture.await(0) fails directly and inappropriately. 2. This PR fixes a bug when connectionCreationTimeout is less than 0, which causes meaningless transport client reconnections and endless executor reconstructions ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new unit tests Closes #41785 from yaooqinn/SPARK-44241. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> (cherry picked from commit 38645fa470b5af7c2e41efa4fb092bdf2463fbbd) Signed-off-by: Kent Yao <y...@apache.org> --- .../network/client/TransportClientFactory.java | 16 +++++++++-- .../apache/spark/network/util/TransportConf.java | 4 +-- .../client/TransportClientFactorySuite.java | 33 +++++++++++++++++++--- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 43408d43e57..188e4ba0f8e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -249,12 +249,13 @@ public class TransportClientFactory implements Closeable { logger.debug("Creating new connection to {}", address); Bootstrap bootstrap = new Bootstrap(); + int connCreateTimeout = conf.connectionCreationTimeoutMs(); bootstrap.group(workerGroup) .channel(socketChannelClass) // Disable Nagle's Algorithm since we don't want packets to wait .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionCreationTimeoutMs()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connCreateTimeout) .option(ChannelOption.ALLOCATOR, pooledAllocator); if (conf.receiveBuf() > 0) { @@ -280,10 +281,19 @@ public class TransportClientFactory implements Closeable { // Connect to the remote server long preConnect = System.nanoTime(); ChannelFuture cf = bootstrap.connect(address); - if (!cf.await(conf.connectionCreationTimeoutMs())) { + + if (connCreateTimeout <= 0) { + cf.awaitUninterruptibly(); + assert cf.isDone(); + if (cf.isCancelled()) { + throw new IOException(String.format("Connecting to %s cancelled", address)); + } else if (!cf.isSuccess()) { + throw new IOException(String.format("Failed to connect to %s", address), cf.cause()); + } + } else if (!cf.await(connCreateTimeout)) { throw new IOException( String.format("Connecting to %s timed out (%s ms)", - address, conf.connectionCreationTimeoutMs())); + address, connCreateTimeout)); } else if (cf.cause() != null) { throw new IOException(String.format("Failed to connect to %s", address), cf.cause()); } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 9dedd5d9849..7c2a408d86d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -103,7 +103,7 @@ public class TransportConf { conf.get("spark.network.timeout", "120s")); long defaultTimeoutMs = JavaUtils.timeStringAsSec( conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000; - return (int) defaultTimeoutMs; + return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs; } /** Connect creation timeout in milliseconds. Default 30 secs. */ @@ -111,7 +111,7 @@ public class TransportConf { long connectionTimeoutS = TimeUnit.MILLISECONDS.toSeconds(connectionTimeoutMs()); long defaultTimeoutMs = JavaUtils.timeStringAsSec( conf.get(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY, connectionTimeoutS + "s")) * 1000; - return (int) defaultTimeoutMs; + return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs; } /** Number of concurrent connections between two nodes for fetching data. */ diff --git a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java index 277ff85db7b..dbbe1540cff 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java @@ -31,10 +31,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; - import org.apache.spark.network.TestUtils; import org.apache.spark.network.TransportContext; import org.apache.spark.network.server.NoOpRpcHandler; @@ -45,6 +41,8 @@ import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportConf; +import static org.junit.Assert.*; + public class TransportClientFactorySuite { private TransportConf conf; private TransportContext context; @@ -239,4 +237,31 @@ public class TransportClientFactorySuite { Assert.assertThrows("fail this connection directly", IOException.class, () -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true)); } + + @Test + public void unlimitedConnectionAndCreationTimeouts() throws IOException, InterruptedException { + Map<String, String> configMap = new HashMap<>(); + configMap.put("spark.shuffle.io.connectionTimeout", "-1"); + configMap.put("spark.shuffle.io.connectionCreationTimeout", "-1"); + TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap)); + RpcHandler rpcHandler = new NoOpRpcHandler(); + try (TransportContext ctx = new TransportContext(conf, rpcHandler, true); + TransportClientFactory factory = ctx.createClientFactory()){ + TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); + assertTrue(c1.isActive()); + long expiredTime = System.currentTimeMillis() + 5000; + while (c1.isActive() && System.currentTimeMillis() < expiredTime) { + Thread.sleep(10); + } + assertTrue(c1.isActive()); + // When connectionCreationTimeout is unlimited, the connection shall be able to + // fail when the server is not reachable. + TransportServer server = ctx.createServer(); + int unreachablePort = server.getPort(); + JavaUtils.closeQuietly(server); + IOException exception = Assert.assertThrows(IOException.class, + () -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true)); + assertNotEquals(exception.getCause(), null); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org