Bill commented on a change in pull request #6835:
URL: https://github.com/apache/geode/pull/6835#discussion_r708663990



##########
File path: 
geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java
##########
@@ -123,67 +119,89 @@ private AuthenticateUserOp() {
       super(MessageType.USER_CREDENTIAL_MESSAGE, 1);
       securityProperties = securityProps;
       needsServerLocation = needsServer;
-
       getMessage().setMessageHasSecurePartFlag();
     }
 
     @Override
-    protected void sendMessage(Connection cnx) throws Exception {
-      HeapDataOutputStream hdos = new 
HeapDataOutputStream(KnownVersion.CURRENT);
-      byte[] secureBytes;
-      hdos.writeLong(cnx.getConnectionID());
-      if (securityProperties != null) {
-        DistributedMember server = new 
InternalDistributedMember(cnx.getSocket().getInetAddress(),
-            cnx.getSocket().getPort(), false);
-        DistributedSystem sys = 
InternalDistributedSystem.getConnectedInstance();
-        String authInitMethod = 
sys.getProperties().getProperty(SECURITY_CLIENT_AUTH_INIT);
-
-        Properties credentials = Handshake.getCredentials(authInitMethod, 
securityProperties,
-            server, false, sys.getLogWriter(), sys.getSecurityLogWriter());
-        byte[] credentialBytes;
-        try (HeapDataOutputStream heapdos = new 
HeapDataOutputStream(KnownVersion.CURRENT)) {
-          DataSerializer.writeProperties(credentials, heapdos);
-          credentialBytes = ((ConnectionImpl) 
cnx).encryptBytes(heapdos.toByteArray());
-        }
-        getMessage().addBytesPart(credentialBytes);
+    protected void sendMessage(Connection connection) throws Exception {
+      if (securityProperties == null) {
+        securityProperties = getConnectedSystem().getSecurityProperties();
       }
-      try {
-        secureBytes = ((ConnectionImpl) cnx).encryptBytes(hdos.toByteArray());
-      } finally {
-        hdos.close();
+      byte[] credentialBytes = getCredentialBytes(connection, 
securityProperties);
+      getMessage().addBytesPart(credentialBytes);
+
+      try (HeapDataOutputStream hdos = new HeapDataOutputStream(16, 
KnownVersion.CURRENT)) {
+        hdos.writeLong(connection.getConnectionID());
+        hdos.writeLong(getUserId(connection));
+        getMessage().setSecurePart(((ConnectionImpl) 
connection).encryptBytes(hdos.toByteArray()));
       }
-      getMessage().setSecurePart(secureBytes);
       getMessage().send(false);
     }
 
+    protected long getUserId(Connection connection) {
+      // single user mode
+      if (UserAttributes.userAttributes.get() == null) {
+        return connection.getServer().getUserId();
+      }
+      // multi user mode
+      Long id = 
UserAttributes.userAttributes.get().getServerToId().get(connection.getServer());
+      if (id == null) {
+        return -1L;

Review comment:
       There is a lot of `-1L` and `-1` added in this PR. Have you considered 
adding a constant like `NOT_A_USER_ID` or something like that?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientReAuthenticateMessage.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.tier.MessageType;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.internal.serialization.SerializationContext;
+
+public class ClientReAuthenticateMessage implements ClientMessage {
+
+  /**
+   * This {@code ClientMessage}'s {@code EventID}
+   */
+  private final EventID eventId;
+
+  public ClientReAuthenticateMessage() {
+    this(new EventID());
+  }
+
+  public ClientReAuthenticateMessage(EventID eventId) {
+    this.eventId = eventId;
+  }
+
+  @Override
+  public Message getMessage(CacheClientProxy proxy, boolean notify) throws 
IOException {
+    Message message = new Message(1, KnownVersion.CURRENT);
+    message.setMessageType(MessageType.CLIENT_RE_AUTHENTICATE);
+    message.setTransactionId(0);
+    message.addObjPart(eventId);
+    return message;
+  }
+
+  @Override
+  public boolean shouldBeConflated() {
+    return true;
+  }
+
+  @Override
+  public void toData(DataOutput out,
+      SerializationContext context) throws IOException {
+    eventId.toData(out, context);
+  }
+
+  @Override
+  public int getDSFID() {
+    return CLIENT_RE_AUTHENTICATE;
+  }
+
+  @Override
+  public void fromData(DataInput in,
+      DeserializationContext context) throws IOException, 
ClassNotFoundException {
+    eventId.fromData(in, context);
+  }
+
+  @Override
+  public EventID getEventId() {
+    return eventId;
+  }
+
+  @Override
+  public String getRegionToConflate() {
+    return "gemfire_reserved_region_name_for_client_re_authenticate";
+  }
+
+  @Override
+  public Object getKeyToConflate() {
+    // This method can be called by HARegionQueue.
+    // Use this to identify the message type.
+    return "re_authenticate";
+  }
+
+  @Override
+  public Object getValueToConflate() {
+    // This method can be called by HARegionQueue
+    // Use this to identify the message type.
+    return "re_authenticate";
+  }
+
+  @Override
+  public void setLatestValue(Object value) {}

Review comment:
       should this throw `UnsupportedOperationException` the way 
`GatewaySenderEventImpl` does, instead of silently failing?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
##########
@@ -124,41 +141,34 @@
    */
   protected MessageDispatcher(CacheClientProxy proxy, String name,
       StatisticsClock statisticsClock) throws CacheException {
-    super(name);
-
-    _proxy = proxy;
-
-    // Create the event conflator
-    // this._eventConflator = new BridgeEventConflator
+    this(proxy, name, getMessageQueue(proxy, statisticsClock));
+  }
 
-    // Create the message queue
+  private static HARegionQueue getMessageQueue(CacheClientProxy proxy,
+      StatisticsClock statisticsClock) {

Review comment:
       Nice cleanup of the constructor!

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
##########
@@ -362,31 +387,67 @@ protected void runDispatcher() {
           }
           waitForResumption();
         }
-        try {
-          clientMessage = (ClientMessage) _messageQueue.peek();
-        } catch (RegionDestroyedException skipped) {
-          break;
+
+        // if message is not delivered due to authentication expiation, this 
clientMessage
+        // would not be null.
+        if (clientMessage == null) {
+          try {
+            clientMessage = (ClientMessage) _messageQueue.peek();
+          } catch (RegionDestroyedException skipped) {
+            break;
+          }
         }
+
         getStatistics().setQueueSize(_messageQueue.size());
         if (isStopped()) {
           break;
         }
-        if (clientMessage != null) {
-          // Process the message
-          long start = getStatistics().startTime();
-          //// BUGFIX for BUG#38206 and BUG#37791
-          boolean isDispatched = dispatchMessage(clientMessage);
-          getStatistics().endMessage(start);
-          if (isDispatched) {
+
+        if (clientMessage == null) {
+          _messageQueue.remove();
+          continue;
+        }
+
+        // Process the message
+        long start = getStatistics().startTime();
+        try {
+          if (dispatchMessage(clientMessage)) {
+            getStatistics().endMessage(start);
             _messageQueue.remove();
             if (clientMessage instanceof ClientMarkerMessageImpl) {
               getProxy().setMarkerEnqueued(false);
             }
           }
-        } else {
+          clientMessage = null;
+          wait_for_re_auth_start_time = -1;
+        } catch (NotAuthorizedException notAuthorized) {
+          // behave as if the message is dispatched, remove from the queue
+          logger.info("skip delivering message: " + clientMessage, 
notAuthorized);
           _messageQueue.remove();
+          clientMessage = null;
+        } catch (AuthenticationExpiredException expired) {
+          if (wait_for_re_auth_start_time == -1) {
+            wait_for_re_auth_start_time = System.currentTimeMillis();
+            // only send the message to clients who can handle the message
+            if 
(getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) 
{
+              EventID eventId = createEventId();
+              sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
+            }
+            // for older client, we still wait, just in case client will 
perform some operations to
+            // trigger credential refresh on its own.
+            Thread.sleep(200);
+          } else {
+            long elapsedTime = System.currentTimeMillis() - 
wait_for_re_auth_start_time;
+            if (elapsedTime > reAuthenticateWaitTime) {
+              logger.warn("Client did not re-authenticate back successfully in 
" + elapsedTime
+                  + "ms. Unregister this client proxy.");
+              pauseOrUnregisterProxy(expired);
+            } else {
+              Thread.sleep(200);

Review comment:
       I don't understand why the `sleep()` is needed here. Please explain.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
##########
@@ -335,10 +356,14 @@ protected void runDispatcher() {
       logger.debug("{}: Beginning to process events", this);
     }
 
+    long reAuthenticateWaitTime =
+        getSystemProperty(RE_AUTHENTICATE_WAIT_TIME, 
DEFAULT_RE_AUTHENTICATE_WAIT_TIME);
+
     ClientMessage clientMessage = null;
+    long wait_for_re_auth_start_time = -1;

Review comment:
       Please consider moving this variable declaration closer to where it's 
needed. It can moved inside the `while` body: down to just before the `try` 
block at line 413.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
##########
@@ -362,31 +387,67 @@ protected void runDispatcher() {
           }
           waitForResumption();
         }
-        try {
-          clientMessage = (ClientMessage) _messageQueue.peek();
-        } catch (RegionDestroyedException skipped) {
-          break;
+
+        // if message is not delivered due to authentication expiation, this 
clientMessage
+        // would not be null.
+        if (clientMessage == null) {
+          try {
+            clientMessage = (ClientMessage) _messageQueue.peek();
+          } catch (RegionDestroyedException skipped) {
+            break;
+          }
         }
+
         getStatistics().setQueueSize(_messageQueue.size());
         if (isStopped()) {
           break;
         }
-        if (clientMessage != null) {
-          // Process the message
-          long start = getStatistics().startTime();
-          //// BUGFIX for BUG#38206 and BUG#37791
-          boolean isDispatched = dispatchMessage(clientMessage);
-          getStatistics().endMessage(start);
-          if (isDispatched) {
+
+        if (clientMessage == null) {
+          _messageQueue.remove();
+          continue;
+        }
+
+        // Process the message
+        long start = getStatistics().startTime();
+        try {
+          if (dispatchMessage(clientMessage)) {
+            getStatistics().endMessage(start);
             _messageQueue.remove();
             if (clientMessage instanceof ClientMarkerMessageImpl) {
               getProxy().setMarkerEnqueued(false);
             }
           }
-        } else {
+          clientMessage = null;
+          wait_for_re_auth_start_time = -1;
+        } catch (NotAuthorizedException notAuthorized) {
+          // behave as if the message is dispatched, remove from the queue
+          logger.info("skip delivering message: " + clientMessage, 
notAuthorized);
           _messageQueue.remove();
+          clientMessage = null;
+        } catch (AuthenticationExpiredException expired) {
+          if (wait_for_re_auth_start_time == -1) {
+            wait_for_re_auth_start_time = System.currentTimeMillis();
+            // only send the message to clients who can handle the message
+            if 
(getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) 
{
+              EventID eventId = createEventId();
+              sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
+            }
+            // for older client, we still wait, just in case client will 
perform some operations to
+            // trigger credential refresh on its own.

Review comment:
       The comment makes me think that this `Thread.sleep()` should be in an 
`else` clause so the sleep if clients don't understand the re-authenticate 
message.
   
   Consider putting this sleep into an `else` block.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
##########
@@ -1164,6 +1190,20 @@ public void removeUserAuth(Message message, boolean 
keepAlive) {
     }
   }
 
+  @VisibleForTesting
+  long putSubject(Subject subject, long existingUniqueId) {
+    long uniqueId;
+    uniqueId = clientUserAuths.putSubject(subject, existingUniqueId);

Review comment:
       Please combine the assignment on this line with the declaration on the 
previous one. 
   
   Incidentally, `uniqueId` can be `final`.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
##########
@@ -581,6 +656,33 @@ protected boolean dispatchMessage(ClientMessage 
clientMessage) throws IOExceptio
         logger.trace(msg.toString());
       }
 
+      // authorize the message before dispatching
+      String regionName = clientUpdateMessage.getRegionName();
+      Object key = clientUpdateMessage.getKeyOfInterest();
+      SecurityService securityService = getCache().getSecurityService();
+      ResourcePermission permission = new 
ResourcePermission(ResourcePermission.Resource.DATA,
+          ResourcePermission.Operation.READ,
+          regionName, key == null ? null : key.toString());

Review comment:
       `permission` isn't needed outside the body of the next `if`. And even 
then it is only needed conditionally (sometimes it isn't needed at all). And 
`regionName` and `key` are used only to compute `permission`.
   
   Please consider replacing lines 660, 661, 663-665 w/ a method that takes 
`clientUpdateMessage` and returns `ResourcePermission` and then call that 
method in the two places `permission` is currently used below. This will 
eliminate some spurious object allocation and make this code a little simpler.

##########
File path: 
geode-core/src/main/java/org/apache/geode/cache/client/internal/AuthenticateUserOp.java
##########
@@ -87,31 +101,13 @@ private AuthenticateUserOp() {
   }
 
   static class AuthenticateUserOpImpl extends AbstractOp {
-
+    private static final Logger logger = LogService.getLogger();

Review comment:
       please remove this unused variable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to