This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 4fbc35c29e GEODE-10097: Avoid Thread.sleep for re-auth in MessageDispatcher (#7556) 4fbc35c29e is described below commit 4fbc35c29ef131f8d8f1f82391d224f3c2bbc346 Author: Jinmei Liao <jil...@pivotal.io> AuthorDate: Mon Apr 11 08:48:34 2022 -0700 GEODE-10097: Avoid Thread.sleep for re-auth in MessageDispatcher (#7556) --- .../security/AuthExpirationDistributedTest.java | 9 ++- .../cache/tier/sockets/CacheClientProxy.java | 7 ++ .../cache/tier/sockets/MessageDispatcher.java | 83 ++++++++++++++-------- .../cache/tier/sockets/ServerConnection.java | 1 + .../cache/tier/sockets/MessageDispatcherTest.java | 41 +++++++++-- .../geode/security/ExpirableSecurityManager.java | 8 ++- .../sanctioned-geode-junit-serializables.txt | 2 +- 7 files changed, 111 insertions(+), 40 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java index 8a3a1de11b..d1bb73e2c9 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.contrib.java.lang.system.RestoreSystemProperties; @@ -65,6 +66,12 @@ public class AuthExpirationDistributedTest { private ClientVM clientVM; + @Before + public void before() throws Exception { + // this is enabled to show how many times authorize call is made with each permission key + getSecurityManager().setAllowDuplicate(true); + } + @After public void after() { if (clientVM != null) { @@ -109,7 +116,7 @@ public class AuthExpirationDistributedTest { Map<String, List<String>> authorizedOps = getSecurityManager().getAuthorizedOps(); assertThat(authorizedOps.keySet().size()).isEqualTo(2); assertThat(authorizedOps.get("user1")).asList().containsExactly("DATA:READ:region", - "DATA:READ:region:1"); + "DATA:READ:region", "DATA:READ:region:1"); assertThat(authorizedOps.get("user2")).asList().containsExactly("DATA:READ:region:2"); Map<String, List<String>> unAuthorizedOps = getSecurityManager().getUnAuthorizedOps(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java index 43be4421e7..89c14a8be3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java @@ -723,6 +723,13 @@ public class CacheClientProxy implements ClientSession { return _messageDispatcher.isWaitingForReAuthentication(); } + public void notifyReAuthentication() { + if (_messageDispatcher == null) { + return; + } + _messageDispatcher.notifyReAuthentication(); + } + /** * Returns whether the proxy is paused. It is paused if its message dispatcher is paused. This * only applies to durable clients. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java index 4c88ee20d8..a03ab4bf81 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java @@ -113,6 +113,7 @@ public class MessageDispatcher extends LoggingThread { private volatile boolean _isStopped = true; private volatile long waitForReAuthenticationStartTime = -1; + private final Object reAuthenticationLock = new Object(); /** * A lock object used to control pausing this dispatcher @@ -193,6 +194,15 @@ public class MessageDispatcher extends LoggingThread { return waitForReAuthenticationStartTime > 0; } + private boolean subjectUpdated = false; + + public void notifyReAuthentication() { + synchronized (reAuthenticationLock) { + subjectUpdated = true; + reAuthenticationLock.notifyAll(); + } + } + private CacheClientProxy getProxy() { return _proxy; } @@ -361,9 +371,6 @@ public class MessageDispatcher extends LoggingThread { logger.debug("{}: Beginning to process events", this); } - long reAuthenticateWaitTime = - getSystemProperty(RE_AUTHENTICATE_WAIT_TIME, DEFAULT_RE_AUTHENTICATE_WAIT_TIME); - ClientMessage clientMessage = null; while (!isStopped()) { @@ -432,34 +439,8 @@ public class MessageDispatcher extends LoggingThread { _messageQueue.remove(); clientMessage = null; } catch (AuthenticationExpiredException expired) { - if (waitForReAuthenticationStartTime == -1) { - waitForReAuthenticationStartTime = System.currentTimeMillis(); - // only send the message to clients who can handle the message - if (getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) { - EventID eventId = createEventId(); - sendMessageDirectly(new ClientReAuthenticateMessage(eventId)); - } - // We wait for all versions of clients to re-authenticate. For older clients we still - // wait, just in case client will perform some operations to - // trigger credential refresh on its own. - Thread.sleep(200); - } else { - long elapsedTime = System.currentTimeMillis() - waitForReAuthenticationStartTime; - if (elapsedTime > reAuthenticateWaitTime) { - // reset the timer here since we are no longer waiting for re-auth to happen anymore - waitForReAuthenticationStartTime = -1; - synchronized (_stopDispatchingLock) { - logger.warn("Client did not re-authenticate back successfully in " + elapsedTime - + "ms. Unregister this client proxy."); - pauseOrUnregisterProxy(expired); - } - exceptionOccurred = true; - } else { - Thread.sleep(200); - } - } + exceptionOccurred = handleAuthenticationExpiredException(expired); } - } catch (MessageTooLargeException e) { logger.warn("Message too large to send to client: {}, {}", clientMessage, e.getMessage()); } catch (IOException e) { @@ -548,6 +529,48 @@ public class MessageDispatcher extends LoggingThread { } } + private boolean handleAuthenticationExpiredException(AuthenticationExpiredException expired) + throws InterruptedException { + long reAuthenticateWaitTime = + getSystemProperty(RE_AUTHENTICATE_WAIT_TIME, DEFAULT_RE_AUTHENTICATE_WAIT_TIME); + synchronized (reAuthenticationLock) { + // turn on the "isWaitingForReAuthentication" flag before we send the re-auth message + // if we do it the other way around, the re-auth might be finished before we turn on the + // flag for the notification to happen. + waitForReAuthenticationStartTime = System.currentTimeMillis(); + subjectUpdated = false; + // only send the message to clients who can handle the message + if (getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) { + EventID eventId = createEventId(); + sendMessageDirectly(new ClientReAuthenticateMessage(eventId)); + } + + // We wait for all versions of clients to re-authenticate. For older clients we still + // wait, just in case client will perform some operations to + // trigger credential refresh on its own. + long waitFinishTime = waitForReAuthenticationStartTime + reAuthenticateWaitTime; + long remainingWaitTime = waitFinishTime - System.currentTimeMillis(); + while (!subjectUpdated && remainingWaitTime > 0) { + reAuthenticationLock.wait(remainingWaitTime); + remainingWaitTime = waitFinishTime - System.currentTimeMillis(); + } + } + // the above wait timed out + if (!subjectUpdated) { + long elapsedTime = System.currentTimeMillis() - waitForReAuthenticationStartTime; + // reset the timer here since we are no longer waiting for re-auth to happen anymore + waitForReAuthenticationStartTime = -1; + synchronized (_stopDispatchingLock) { + logger.warn( + "Client did not re-authenticate back successfully in {} ms. Unregister this client proxy.", + elapsedTime); + pauseOrUnregisterProxy(expired); + return true; + } + } + return false; + } + @VisibleForTesting void dispatchResidualMessages() { List<ClientMessage> list = new ArrayList<>(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index f39ad719a0..004ebd4730 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -1231,6 +1231,7 @@ public class ServerConnection implements Runnable { secureLogger.debug("update subject on client proxy {} with uniqueId {}", clientProxy, uniqueId); clientProxy.setSubject(subject); + clientProxy.notifyReAuthentication(); } return uniqueId; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java index 555e3f3556..a01bc7bf1c 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java @@ -18,6 +18,7 @@ package org.apache.geode.internal.cache.tier.sockets; import static org.apache.geode.internal.lang.SystemPropertyHelper.RE_AUTHENTICATE_WAIT_TIME; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -35,8 +36,8 @@ import static org.mockito.Mockito.when; import java.io.IOException; import org.apache.shiro.subject.Subject; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.apache.geode.CancelCriterion; import org.apache.geode.cache.RegionDestroyedException; @@ -65,7 +66,7 @@ public class MessageDispatcherTest { private CacheClientProxyStats proxyStats; private EventID eventID; - @Before + @BeforeEach public void before() throws Exception { proxy = mock(CacheClientProxy.class); message = mock(ClientUpdateMessageImpl.class); @@ -137,7 +138,7 @@ public class MessageDispatcherTest { @Test public void newClientWillGetClientReAuthenticateMessage() throws Exception { - doReturn(false, false, false, false, false, true).when(dispatcher).isStopped(); + doReturn(false, false, false, true).when(dispatcher).isStopped(); doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any()); when(messageQueue.peek()).thenReturn(message); when(proxy.getVersion()).thenReturn(KnownVersion.GEODE_1_15_0); @@ -145,7 +146,7 @@ public class MessageDispatcherTest { doNothing().when(dispatcher).sendMessageDirectly(any()); // make sure wait time is short - doReturn(-1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), anyLong()); + doReturn(1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), anyLong()); dispatcher.runDispatcher(); // verify a ReAuthenticate message will be send to the user @@ -159,9 +160,9 @@ public class MessageDispatcherTest { @Test public void oldClientWillNotGetClientReAuthenticateMessage() throws Exception { - doReturn(false, false, false, false, false, true).when(dispatcher).isStopped(); + doReturn(false, false, true).when(dispatcher).isStopped(); // make sure wait time is short - doReturn(-1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), anyLong()); + doReturn(1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), anyLong()); doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any()); when(messageQueue.peek()).thenReturn(message); @@ -173,6 +174,32 @@ public class MessageDispatcherTest { verify(dispatcher, never()).dispatchResidualMessages(); } + + @Test + public void oldClientWillContinueToDeliverMessageIfNotified() throws Exception { + doReturn(false, false, true).when(dispatcher).isStopped(); + // make sure wait time is short + doReturn(10000L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), anyLong()); + doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any()); + when(messageQueue.peek()).thenReturn(message); + when(proxy.getVersion()).thenReturn(KnownVersion.GEODE_1_14_0); + + Thread dispatcherThread = new Thread(() -> dispatcher.runDispatcher()); + Thread notifyThread = new Thread(() -> dispatcher.notifyReAuthentication()); + + dispatcherThread.start(); + await().until(() -> dispatcher.isWaitingForReAuthentication()); + notifyThread.start(); + + dispatcherThread.join(); + notifyThread.join(); + + verify(dispatcher, never()).sendMessageDirectly(any()); + // dispatcher will dispatch message + verify(dispatcher, never()).pauseOrUnregisterProxy(any(AuthenticationExpiredException.class)); + verify(dispatcher).dispatchResidualMessages(); + } + @Test public void ioExceptionHappenedForDurableClientWillContinueToPeekForNextMessage() throws Exception { diff --git a/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java b/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java index 06b2845145..56c5157270 100644 --- a/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java +++ b/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java @@ -39,6 +39,7 @@ public class ExpirableSecurityManager extends SimpleSecurityManager implements S new ConcurrentHashMap<>(); private final Map<String, List<String>> unauthorizedOps = new ConcurrentHashMap<>(); + private boolean allowDuplicate = false; @Override public Object authenticate(final Properties credentials) throws AuthenticationFailedException { @@ -65,6 +66,10 @@ public class ExpirableSecurityManager extends SimpleSecurityManager implements S expired_users.add(user); } + public void setAllowDuplicate(boolean allowDuplicate) { + this.allowDuplicate = allowDuplicate; + } + public Set<String> getExpiredUsers() { return expired_users; } @@ -83,7 +88,7 @@ public class ExpirableSecurityManager extends SimpleSecurityManager implements S if (list == null) { list = new ArrayList<>(); } - if (!list.contains(permission.toString())) { + if (allowDuplicate || !list.contains(permission.toString())) { list.add(permission.toString()); } maps.put(user.toString(), list); @@ -93,6 +98,7 @@ public class ExpirableSecurityManager extends SimpleSecurityManager implements S expired_users.clear(); authorizedOps.clear(); unauthorizedOps.clear(); + allowDuplicate = false; } } diff --git a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt index 6249fc65a3..97e1d24230 100644 --- a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt +++ b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt @@ -69,7 +69,7 @@ org/apache/geode/pdx/Day,false org/apache/geode/pdx/DomainObjectPdxAuto$Day,false org/apache/geode/pdx/DomainObjectPdxAutoNoDefaultConstructor$Day,false org/apache/geode/pdx/SimpleClass$SimpleEnum,false -org/apache/geode/security/ExpirableSecurityManager,false,authorizedOps:java/util/Map,expired_users:java/util/Set,unauthorizedOps:java/util/Map +org/apache/geode/security/ExpirableSecurityManager,false,allowDuplicate:boolean,authorizedOps:java/util/Map,expired_users:java/util/Set,unauthorizedOps:java/util/Map org/apache/geode/security/query/data/PdxQueryTestObject,false,age:int,id:int,name:java/lang/String,shouldThrowException:boolean org/apache/geode/security/query/data/PdxTrade,false,cusip:java/lang/String,id:java/lang/String,price:int,shares:int org/apache/geode/security/query/data/QueryTestObject,false,dateField:java/util/Date,id:int,mapField:java/util/Map,name:java/lang/String