This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 9da2cd4 GEODE-6536: Added retry in borrowConnection/single hop (#4719) 9da2cd4 is described below commit 9da2cd49e2e04564b446eaad579b51e986bc2179 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Thu Feb 27 07:38:31 2020 +0100 GEODE-6536: Added retry in borrowConnection/single hop (#4719) * GEODE-6536: Added retry in borrowConnection/single hop * GEODE-6536: bug fix * GEODE-6536: update after comments --- .../pooling/ConnectionManagerImplTest.java | 32 +++++++-------- .../pooling/ConnectionManagerJUnitTest.java | 6 +-- .../cache/client/internal/OpExecutorImpl.java | 2 +- .../geode/cache/client/internal/PoolImpl.java | 8 +++- .../client/internal/pooling/ConnectionManager.java | 5 ++- .../internal/pooling/ConnectionManagerImpl.java | 48 +++++++++++++++------- .../client/internal/OpExecutorImplJUnitTest.java | 2 +- 7 files changed, 64 insertions(+), 39 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java index 542a8fe..748f37b 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java @@ -94,7 +94,7 @@ public class ConnectionManagerImplTest { ServerLocation serverLocation = mock(ServerLocation.class); connectionManager = createDefaultConnectionManager(); - assertThatThrownBy(() -> connectionManager.borrowConnection(serverLocation, true)) + assertThatThrownBy(() -> connectionManager.borrowConnection(serverLocation, timeout, true)) .isInstanceOf(AllConnectionsInUseException.class); connectionManager.close(false); @@ -110,7 +110,7 @@ public class ConnectionManagerImplTest { connectionManager = createDefaultConnectionManager(); connectionManager.start(backgroundProcessor); - assertThat(connectionManager.borrowConnection(serverLocation, false)) + assertThat(connectionManager.borrowConnection(serverLocation, timeout, false)) .isInstanceOf(PooledConnection.class); assertThat(connectionManager.getConnectionCount()).isEqualTo(1); @@ -266,9 +266,9 @@ public class ConnectionManagerImplTest { cancelCriterion, poolStats); connectionManager.start(backgroundProcessor); - connectionManager.borrowConnection(serverLocation1, false); - connectionManager.borrowConnection(serverLocation2, false); - connectionManager.borrowConnection(serverLocation3, false); + connectionManager.borrowConnection(serverLocation1, timeout, false); + connectionManager.borrowConnection(serverLocation2, timeout, false); + connectionManager.borrowConnection(serverLocation3, timeout, false); assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections); @@ -295,9 +295,9 @@ public class ConnectionManagerImplTest { connectionManager = createDefaultConnectionManager(); connectionManager.start(backgroundProcessor); Connection heldConnection1 = - connectionManager.borrowConnection(serverLocation1, false); + connectionManager.borrowConnection(serverLocation1, timeout, false); Connection heldConnection2 = - connectionManager.borrowConnection(serverLocation2, false); + connectionManager.borrowConnection(serverLocation2, timeout, false); assertThat(connectionManager.getConnectionCount()).isEqualTo(2); connectionManager.returnConnection(heldConnection1, true); @@ -352,11 +352,11 @@ public class ConnectionManagerImplTest { connectionManager.start(backgroundProcessor); Connection heldConnection1 = - connectionManager.borrowConnection(serverLocation1, false); + connectionManager.borrowConnection(serverLocation1, timeout, false); Connection heldConnection2 = - connectionManager.borrowConnection(serverLocation2, false); + connectionManager.borrowConnection(serverLocation2, timeout, false); Connection heldConnection3 = - connectionManager.borrowConnection(serverLocation3, false); + connectionManager.borrowConnection(serverLocation3, timeout, false); assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections); @@ -391,7 +391,7 @@ public class ConnectionManagerImplTest { connectionManager = createDefaultConnectionManager(); connectionManager.start(backgroundProcessor); - Connection heldConnection = connectionManager.borrowConnection(serverLocation1, false); + Connection heldConnection = connectionManager.borrowConnection(serverLocation1, timeout, false); heldConnection = connectionManager.exchangeConnection(heldConnection, excluded); assertThat(heldConnection.getServer()).isEqualTo(connection2.getServer()); @@ -435,9 +435,9 @@ public class ConnectionManagerImplTest { cancelCriterion, poolStats); connectionManager.start(backgroundProcessor); - Connection heldConnection = connectionManager.borrowConnection(serverLocation1, false); - connectionManager.borrowConnection(serverLocation2, false); - connectionManager.borrowConnection(serverLocation3, false); + Connection heldConnection = connectionManager.borrowConnection(serverLocation1, timeout, false); + connectionManager.borrowConnection(serverLocation2, timeout, false); + connectionManager.borrowConnection(serverLocation3, timeout, false); assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections); heldConnection = connectionManager.exchangeConnection(heldConnection, excluded); @@ -470,9 +470,9 @@ public class ConnectionManagerImplTest { connectionManager.start(backgroundProcessor); Connection heldConnection1 = - connectionManager.borrowConnection(serverLocation1, false); + connectionManager.borrowConnection(serverLocation1, timeout, false); Connection heldConnection2 = - connectionManager.borrowConnection(serverLocation2, false); + connectionManager.borrowConnection(serverLocation2, timeout, false); connectionManager.returnConnection(heldConnection2); heldConnection2 = connectionManager.exchangeConnection(heldConnection1, excluded); diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java index 4f20d91..de00ce4 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java @@ -341,9 +341,9 @@ public class ConnectionManagerJUnitTest { // Ok, now get some connections that fill our queue Connection ping1 = - manager.borrowConnection(new ServerLocation("localhost", 5), false); + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); Connection ping2 = - manager.borrowConnection(new ServerLocation("localhost", 5), false); + manager.borrowConnection(new ServerLocation("localhost", 5), BORROW_TIMEOUT_MILLIS, false); manager.returnConnection(ping1); manager.returnConnection(ping2); @@ -692,7 +692,7 @@ public class ConnectionManagerJUnitTest { // do nothing } - Connection conn3 = manager.borrowConnection(new ServerLocation("localhost", -2), false); + Connection conn3 = manager.borrowConnection(new ServerLocation("localhost", -2), 10, false); Assert.assertEquals(2, factory.creates); Assert.assertEquals(0, factory.destroys); Assert.assertEquals(0, factory.closes); diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java index a1e377e..869869b 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java @@ -323,7 +323,7 @@ public class OpExecutorImpl implements ExecutablePool { } } if (conn == null) { - conn = connectionManager.borrowConnection(p_server, onlyUseExistingCnx); + conn = connectionManager.borrowConnection(p_server, serverTimeout, onlyUseExistingCnx); } try { return executeWithPossibleReAuthentication(conn, op); diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java index 4e94a8d..c83fce3 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java @@ -918,10 +918,14 @@ public class PoolImpl implements InternalPool { } /** - * Test hook that acquires and returns a connection from the pool with a given ServerLocation. + * Borrows a connection to a specific server from the pool.. Used by gateway and tests. Any + * connection + * that is acquired using this method must be returned using returnConnection, even if it is + * destroyed. + * */ public Connection acquireConnection(ServerLocation loc) { - return manager.borrowConnection(loc, false); + return manager.borrowConnection(loc, 15000L, false); } /** diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java index 7021cdd..8861821 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java @@ -56,17 +56,18 @@ public interface ConnectionManager { * no connection is available. * * @param server The server the connection needs to be to. + * @param aquireTimeout The amount of time to wait for a connection to become available, if + * onlyUseExistingCnx is set to true. * @param onlyUseExistingCnx if true, will not create a new connection if none are available. * @return A connection to use. * @throws AllConnectionsInUseException If there is no available connection on the desired server, * and onlyUseExistingCnx is set. - * @throws ServerOperationException If there is an issue creating the connection due to security * @throws NoAvailableServersException If we can't connect to any server * @throws ServerConnectivityException If finding a connection and creating a connection both fail * to return a connection * */ - Connection borrowConnection(ServerLocation server, boolean onlyUseExistingCnx) + Connection borrowConnection(ServerLocation server, long aquireTimeout, boolean onlyUseExistingCnx) throws AllConnectionsInUseException, NoAvailableServersException; /** diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java index b3bffc9..84d9570 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java @@ -301,31 +301,51 @@ public class ConnectionManagerImpl implements ConnectionManager { throw new AllConnectionsInUseException(); } - /** - * Borrow a connection to a specific server. This task currently allows us to break the connection - * limit, because it is used by tasks from the background thread that shouldn't be constrained by - * the limit. They will only violate the limit by 1 connection, and that connection will be - * destroyed when returned to the pool. - */ @Override - public PooledConnection borrowConnection(ServerLocation server, - boolean onlyUseExistingCnx) throws AllConnectionsInUseException, NoAvailableServersException { + public PooledConnection borrowConnection(ServerLocation server, long acquireTimeout, + boolean onlyUseExistingCnx) + throws AllConnectionsInUseException, NoAvailableServersException, + ServerConnectivityException { + PooledConnection connection = availableConnectionManager.useFirst((c) -> c.getServer().equals(server)); if (null != connection) { return connection; } - if (onlyUseExistingCnx) { - throw new AllConnectionsInUseException(); + if (!onlyUseExistingCnx) { + connection = forceCreateConnection(server); + if (null != connection) { + return connection; + } + throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server); } - connection = forceCreateConnection(server); - if (null != connection) { - return connection; + long waitStart = NOT_WAITING; + try { + long timeout = System.nanoTime() + MILLISECONDS.toNanos(acquireTimeout); + while (true) { + connection = + availableConnectionManager.useFirst((c) -> c.getServer().equals(server)); + if (null != connection) { + return connection; + } + + if (checkShutdownInterruptedOrTimeout(timeout)) { + break; + } + + waitStart = beginConnectionWaitStatIfNotStarted(waitStart); + + Thread.yield(); + } + } finally { + endConnectionWaitStatIfStarted(waitStart); } - throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server); + cancelCriterion.checkCancelInProgress(null); + + throw new AllConnectionsInUseException(); } @Override diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java index 752ba93..068001a 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java @@ -453,7 +453,7 @@ public class OpExecutorImplJUnitTest { } @Override - public Connection borrowConnection(ServerLocation server, + public Connection borrowConnection(ServerLocation server, long aquireTimeout, boolean onlyUseExistingCnx) { borrows++; return new DummyConnection(server);