This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch branch-1.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1.3 by this push:
     new 0d5baff  HBASE-22492 Wrap RPC responses with SASL after queueing for 
response (Sébastien Barnoud)
0d5baff is described below

commit 0d5baffd1beb2acdcd989c5c611bab8ce78b0f09
Author: Josh Elser <els...@apache.org>
AuthorDate: Fri Jun 21 16:29:08 2019 -0400

    HBASE-22492 Wrap RPC responses with SASL after queueing for response 
(Sébastien Barnoud)
    
    Amending-Author: Josh Elser <els...@apache.org>
    Signed-off-by: Andrew Purtell <apurt...@apache.org>
    Signed-off-by: Josh Elser <els...@apache.org>
---
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     | 38 ++++++++++++++++------
 1 file changed, 28 insertions(+), 10 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 975309c..0bff9ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -351,6 +351,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
 
     private User user;
     private InetAddress remoteAddress;
+    private boolean saslWrapDone;
 
     private long responseCellSize = 0;
     private long responseBlockSize = 0;
@@ -378,6 +379,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       this.tinfo = tinfo;
       this.user = connection == null? null: connection.user; // FindBugs: 
NP_NULL_ON_SOME_PATH
       this.remoteAddress = remoteAddress;
+      this.saslWrapDone = false;
       this.retryImmediatelySupported =
           connection == null? null: connection.retryImmediatelySupported;
       this.timeout = timeout;
@@ -486,10 +488,6 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
         byte[] b = createHeaderAndMessageBytes(result, header);
 
         bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
-
-        if (connection.useWrap) {
-          bc = wrapWithSasl(bc);
-        }
       } catch (IOException e) {
         LOG.warn("Exception while creating response " + e);
       }
@@ -534,6 +532,18 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       return b;
     }
 
+    private synchronized void wrapWithSasl() throws IOException {
+      // do it only once per call
+      if (saslWrapDone) {
+        return;
+      }
+      response = wrapWithSasl(response);
+      saslWrapDone = true;
+    }
+
+    /**
+     * Do not call directly, invoke via {@link #wrapWithSasl()}.
+     */
     private BufferChain wrapWithSasl(BufferChain bc)
         throws IOException {
       if (!this.connection.useSasl) return bc;
@@ -541,11 +551,11 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       // THIS IS A BIG UGLY COPY.
       byte [] responseBytes = bc.getBytes();
       byte [] token;
-      // synchronization may be needed since there can be multiple Handler
-      // threads using saslServer to wrap responses.
-      synchronized (connection.saslServer) {
-        token = connection.saslServer.wrap(responseBytes, 0, 
responseBytes.length);
-      }
+
+      // Previously, synchronization was needed since there could be multiple 
Handler
+      // threads using saslServer to wrap responses. However, now we wrap the 
response
+      // inside of the Responder thread to avoid sending back mis-ordered SASL 
messages.
+      token = connection.saslServer.wrap(responseBytes, 0, 
responseBytes.length);
       if (LOG.isTraceEnabled()) {
         LOG.trace("Adding saslServer wrapped token of size " + token.length
             + " as call response.");
@@ -1191,6 +1201,11 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     private boolean processResponse(final Call call) throws IOException {
       boolean error = true;
       try {
+        // Wrap the message "late" in SASL to ensure that the sequence number 
matches the order of
+        // responses we write out.
+        if (call.connection.useWrap) {
+          call.wrapWithSasl();
+        }
         // Send as much data as we can in the non-blocking fashion
         long numBytes = channelWrite(call.connection.channel, call.response);
         if (numBytes < 0) {
@@ -1223,6 +1238,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
      */
     private boolean processAllResponses(final Connection connection) throws 
IOException {
       // We want only one writer on the channel for a connection at a time.
+      boolean isEmpty = false;
       connection.responseWriteLock.lock();
       try {
         for (int i = 0; i < 20; i++) {
@@ -1236,11 +1252,13 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
             return false;
           }
         }
+        // Check that state within the lock to be consistent
+        isEmpty = connection.responseQueue.isEmpty();
       } finally {
         connection.responseWriteLock.unlock();
       }
 
-      return connection.responseQueue.isEmpty();
+      return isEmpty;
     }
 
     //

Reply via email to