Github user aarondav commented on a diff in the pull request:
https://github.com/apache/spark/pull/3625#discussion_r21406760
--- Diff:
network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
---
@@ -97,23 +116,45 @@ public TransportClient createClient(String remoteHost,
int remotePort) throws IO
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
final InetSocketAddress address = new InetSocketAddress(remoteHost,
remotePort);
- TransportClient cachedClient = connectionPool.get(address);
+
+ // Create the ClientPool if we don't have it yet.
+ ClientPool clientPool = connectionPool.get(address);
+ if (clientPool == null) {
+ clientPool = connectionPool.putIfAbsent(address, new ClientPool());
+ }
+
+ int clientIndex = rand.nextInt(numConnectionsPerPeer);
+ TransportClient cachedClient = clientPool.clients[clientIndex];
if (cachedClient != null) {
if (cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", address,
cachedClient);
return cachedClient;
} else {
logger.info("Found inactive connection to {}, closing it.",
address);
- connectionPool.remove(address, cachedClient); // Remove inactive
clients.
+ clientPool.clients[clientIndex] = null; // Remove inactive
clients.
--- End diff --
Shouldn't this be behind a lock?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]