Make SecureRpcHelper a Netty pipeline stage

This decouples SecureRpcHelper from TabletClient. When negotiation is
complete, it sends an "event" upstream to the TabletClient and removes
itself from the pipeline.

I'm hoping this decoupling will make it easier to test SecureRpcHelper
in isolation as we add more functionality like TLS negotiation.

After this is committed, I'm also hoping to rename SecureRpcHelper to
Negotiator.

Change-Id: I5f4b4516219b8eebf24786b4ceb13f2e6260f03b
Reviewed-on: http://gerrit.cloudera.org:8080/5927
Reviewed-by: Dan Burkert <danburk...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jdcry...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a25162ab
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a25162ab
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a25162ab

Branch: refs/heads/master
Commit: a25162abb068519df3fdb44751ffd18cb9fe630e
Parents: 24b0624
Author: Todd Lipcon <t...@apache.org>
Authored: Tue Feb 7 02:44:47 2017 -0800
Committer: Todd Lipcon <t...@apache.org>
Committed: Wed Feb 8 21:27:55 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/CallResponse.java    |  29 ++++-
 .../org/apache/kudu/client/ConnectionCache.java |   1 +
 .../org/apache/kudu/client/SecureRpcHelper.java | 108 ++++++++++++-------
 .../org/apache/kudu/client/TabletClient.java    |  61 +++--------
 4 files changed, 116 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a25162ab/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
index 3b93f60..3a9a26f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
@@ -19,7 +19,12 @@ package org.apache.kudu.client;
 
 import java.util.List;
 
+import com.google.protobuf.Message;
+
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.rpc.RpcHeader;
@@ -48,7 +53,7 @@ final class CallResponse {
    * @throws IndexOutOfBoundsException if any length prefix inside the
    * response points outside the bounds of the buffer.
    */
-  public CallResponse(final ChannelBuffer buf) {
+  private CallResponse(final ChannelBuffer buf) {
     this.buf = buf;
 
     this.totalResponseSize = buf.readableBytes();
@@ -74,6 +79,12 @@ final class CallResponse {
     return this.totalResponseSize;
   }
 
+  public <T extends Message> T parseResponse(T prototypeInstance) {
+    prototypeInstance.newBuilderForType();
+    return prototypeInstance;
+
+  }
+
   /**
    * @return A slice pointing to the section of the packet reserved for the 
main
    * protobuf message.
@@ -153,4 +164,20 @@ final class CallResponse {
     }
     return new Slice(payload, offset, length);
   }
+
+  /**
+   * Netty channel handler which receives incoming frames (ChannelBuffers)
+   * and constructs CallResponse objects.
+   */
+  static class Decoder extends OneToOneDecoder {
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, Channel channel, Object 
message)
+        throws Exception {
+      if (!(message instanceof ChannelBuffer)) {
+        return message;
+      }
+      return new CallResponse((ChannelBuffer)message);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/a25162ab/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index da2725e..aa47e5d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -255,6 +255,7 @@ class ConnectionCache {
           4, // length prefix is 4 bytes long
           0, // no "length adjustment"
           4 /* strip the length prefix */));
+      super.addLast("decode-responses", new CallResponse.Decoder());
       AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
       final TabletClient client = new TabletClient(kuduClient, serverInfo);
       if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/a25162ab/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
index 8c7cdee..bf2fece 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
@@ -51,15 +51,19 @@ import com.google.protobuf.ZeroCopyLiteralByteString;
 
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
 
 @InterfaceAudience.Private
-public class SecureRpcHelper {
+public class SecureRpcHelper extends SimpleChannelUpstreamHandler {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TabletClient.class);
 
@@ -75,22 +79,20 @@ public class SecureRpcHelper {
 
   static final String USER_AND_PASSWORD = "java_client";
 
-  private final TabletClient client;
+  private boolean finished;
   private SaslClient saslClient;
+  public static final int CONNECTION_CTX_CALL_ID = -3;
   private static final int SASL_CALL_ID = -33;
   private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
       ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS);
-  private volatile boolean negoUnderway = true;
   private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
 
-  public SecureRpcHelper(final TabletClient client) {
-    this.client = client;
-  }
+  private final Subject subject;
+  private final String remoteHostname;
 
-  public Set<RpcHeader.RpcFeatureFlag> getServerFeatures() {
-    Preconditions.checkState(!negoUnderway);
-    Preconditions.checkNotNull(serverFeatures);
-    return serverFeatures;
+  public SecureRpcHelper(Subject subject, String remoteHostname) {
+    this.subject = subject;
+    this.remoteHostname = remoteHostname;
   }
 
   public void sendHello(Channel channel) {
@@ -119,30 +121,38 @@ public class SecureRpcHelper {
     Channels.write(channel, buffer);
   }
 
-  public ChannelBuffer handleResponse(ChannelBuffer buf, Channel chan) throws 
SaslException {
-    if (negoUnderway) {
-      RpcHeader.NegotiatePB response = parseSaslMsgResponse(buf);
-      switch (response.getStep()) {
-        case NEGOTIATE:
-          handleNegotiateResponse(chan, response);
-          break;
-        case SASL_CHALLENGE:
-          handleChallengeResponse(chan, response);
-          break;
-        case SASL_SUCCESS:
-          handleSuccessResponse(chan);
-          break;
-        default:
-          LOG.error(String.format("Wrong negotiation step: %s", 
response.getStep()));
-      }
-      return null;
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+      throws Exception {
+    Object m = evt.getMessage();
+    if (!(m instanceof CallResponse)) {
+      ctx.sendUpstream(evt);
+      return;
     }
-    return buf;
+    handleResponse(ctx.getChannel(), (CallResponse)m);
   }
 
+  private void handleResponse(Channel chan, CallResponse callResponse) throws 
SaslException {
+    // TODO(todd): this needs to handle error responses, not just success 
responses.
+    Preconditions.checkState(!finished, "received a response after negotiation 
was complete");
+    RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
+    switch (response.getStep()) {
+      case NEGOTIATE:
+        handleNegotiateResponse(chan, response);
+        break;
+      case SASL_CHALLENGE:
+        handleChallengeResponse(chan, response);
+        break;
+      case SASL_SUCCESS:
+        handleSuccessResponse(chan);
+        break;
+      default:
+        LOG.error(String.format("Wrong negotiation step: %s", 
response.getStep()));
+    }
+  }
 
-  private RpcHeader.NegotiatePB parseSaslMsgResponse(ChannelBuffer buf) {
-    CallResponse response = new CallResponse(buf);
+
+  private RpcHeader.NegotiatePB parseSaslMsgResponse(CallResponse response) {
     RpcHeader.ResponseHeader responseHeader = response.getHeader();
     int id = responseHeader.getCallId();
     if (id != SASL_CALL_ID) {
@@ -156,7 +166,6 @@ public class SecureRpcHelper {
 
   private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB 
response) throws
       SaslException {
-    Preconditions.checkNotNull(chan);
     // Store the supported features advertised by the server.
     ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = 
ImmutableSet.builder();
     for (RpcHeader.RpcFeatureFlag feature : 
response.getSupportedFeaturesList()) {
@@ -188,7 +197,7 @@ public class SecureRpcHelper {
         saslClient = Sasl.createSaslClient(new String[]{ clientMech },
                                            null,
                                            "kudu",
-                                           
client.getServerInfo().getHostname(),
+                                           remoteHostname,
                                            SASL_PROPS,
                                            SASL_CALLBACK);
         if (saslClient.hasInitialResponse()) {
@@ -230,13 +239,28 @@ public class SecureRpcHelper {
   }
 
   private void handleSuccessResponse(Channel chan) {
-    LOG.debug("nego finished");
-    negoUnderway = false;
-    client.sendContext(chan);
+    finished = true;
+    chan.getPipeline().remove(this);
+
+    Channels.write(chan, makeConnectionContext());
+    Channels.fireMessageReceived(chan, new NegotiationResult(serverFeatures));
+  }
+
+  private ChannelBuffer makeConnectionContext() {
+    RpcHeader.ConnectionContextPB.Builder builder = 
RpcHeader.ConnectionContextPB.newBuilder();
+
+    // The UserInformationPB is deprecated, but used by servers prior to Kudu 
1.1.
+    RpcHeader.UserInformationPB.Builder userBuilder = 
RpcHeader.UserInformationPB.newBuilder();
+    userBuilder.setEffectiveUser(SecureRpcHelper.USER_AND_PASSWORD);
+    userBuilder.setRealUser(SecureRpcHelper.USER_AND_PASSWORD);
+    builder.setDEPRECATEDUserInfo(userBuilder.build());
+    RpcHeader.ConnectionContextPB pb = builder.build();
+    RpcHeader.RequestHeader header =
+        
RpcHeader.RequestHeader.newBuilder().setCallId(CONNECTION_CTX_CALL_ID).build();
+    return KuduRpc.toChannelBuffer(header, pb);
   }
 
   private byte[] evaluateChallenge(final byte[] challenge) throws 
SaslException {
-    final Subject subject = client.getSubject();
     try {
       return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
           @Override
@@ -264,4 +288,16 @@ public class SecureRpcHelper {
       }
     }
   }
+
+  /**
+   * The results of a successful negotiation. This is sent to upstream 
handlers in the
+   * Netty pipeline after negotiation completes.
+   */
+  static class NegotiationResult {
+    final Set<RpcFeatureFlag> serverFeatures;
+
+    public NegotiationResult(Set<RpcFeatureFlag> serverFeatures) {
+      this.serverFeatures = serverFeatures;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/a25162ab/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index f17e5a1..33ec662 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -34,7 +34,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.auth.Subject;
-import javax.security.sasl.SaslException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.stumbleupon.async.Deferred;
@@ -57,6 +56,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.WireProtocol;
 import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.client.SecureRpcHelper.NegotiationResult;
 import org.apache.kudu.master.Master;
 import org.apache.kudu.rpc.RpcHeader;
 import org.apache.kudu.tserver.Tserver;
@@ -95,7 +95,6 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
       0,
       0
   };
-  public static final int CONNECTION_CTX_CALL_ID = -3;
 
   /**
    * A monotonically increasing counter for RPC IDs.
@@ -136,8 +135,6 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
 
   private final long socketReadTimeoutMs;
 
-  private SecureRpcHelper secureRpcHelper;
-
   private final RequestTracker requestTracker;
 
   private final ServerInfo serverInfo;
@@ -146,6 +143,8 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
   // differently by also clearing the caches.
   private volatile boolean gotUncaughtException = false;
 
+  private NegotiationResult negotiationResult;
+
   public TabletClient(AsyncKuduClient client, ServerInfo serverInfo) {
     this.kuduClient = client;
     this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
@@ -167,7 +166,7 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
     Pair<ChannelBuffer, Integer> encodedRpcAndId = null;
     if (chan != null) {
       if (!rpc.getRequiredFeatures().isEmpty() &&
-          !secureRpcHelper.getServerFeatures().contains(
+          !negotiationResult.serverFeatures.contains(
               RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS)) {
         Status statusNotSupported = Status.NotSupported("the server does not 
support the" +
             "APPLICATION_FEATURE_FLAGS RPC feature");
@@ -379,29 +378,18 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
   @SuppressWarnings("unchecked")
   public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) 
throws Exception {
     Object m = evt.getMessage();
-    if (!(m instanceof ChannelBuffer)) {
-      ctx.sendUpstream(evt);
+    if (m instanceof SecureRpcHelper.NegotiationResult) {
+      this.negotiationResult = (NegotiationResult) m;
+      this.chan = ctx.getChannel();
+      sendQueuedRpcs();
       return;
     }
-    Channel chan = ctx.getChannel();
-    ChannelBuffer buf = (ChannelBuffer)m;
-    final long start = System.nanoTime();
-    final int rdx = buf.readerIndex();
-    LOG.debug("------------------>> ENTERING DECODE >>------------------");
-
-    try {
-      buf = secureRpcHelper.handleResponse(buf, chan);
-    } catch (SaslException e) {
-      String message = getPeerUuidLoggingString() + "Couldn't complete the 
SASL handshake";
-      LOG.error(message);
-      Status statusIOE = Status.IOError(message);
-      throw new NonRecoverableException(statusIOE, e);
-    }
-    if (buf == null) {
+    if (!(m instanceof CallResponse)) {
+      ctx.sendUpstream(evt);
       return;
     }
-
-    CallResponse response = new CallResponse(buf);
+    CallResponse response = (CallResponse)m;
+    final long start = System.nanoTime();
 
     RpcHeader.ResponseHeader header = response.getHeader();
     if (!header.hasCallId()) {
@@ -465,7 +453,7 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug(getPeerUuidLoggingString() + "rpcid=" + rpcid +
-          ", response size=" + (buf.readerIndex() - rdx) + " bytes" +
+          ", response size=" + response.getTotalResponseSize() +
           ", rpc=" + rpc);
     }
 
@@ -615,7 +603,8 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
     final Channel chan = e.getChannel();
     Channels.write(chan, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
 
-    secureRpcHelper = new SecureRpcHelper(this);
+    SecureRpcHelper secureRpcHelper = new SecureRpcHelper(getSubject(), 
serverInfo.getHostname());
+    ctx.getPipeline().addBefore(ctx.getName(), "negotiation", secureRpcHelper);
     secureRpcHelper.sendHello(chan);
   }
 
@@ -775,26 +764,6 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
     }
   }
 
-  void sendContext(Channel channel) {
-    Channels.write(channel,  header());
-    this.chan = channel;
-    sendQueuedRpcs();
-  }
-
-  private ChannelBuffer header() {
-    RpcHeader.ConnectionContextPB.Builder builder = 
RpcHeader.ConnectionContextPB.newBuilder();
-
-    // The UserInformationPB is deprecated, but used by servers prior to Kudu 
1.1.
-    RpcHeader.UserInformationPB.Builder userBuilder = 
RpcHeader.UserInformationPB.newBuilder();
-    userBuilder.setEffectiveUser(SecureRpcHelper.USER_AND_PASSWORD);
-    userBuilder.setRealUser(SecureRpcHelper.USER_AND_PASSWORD);
-    builder.setDEPRECATEDUserInfo(userBuilder.build());
-    RpcHeader.ConnectionContextPB pb = builder.build();
-    RpcHeader.RequestHeader header =
-        
RpcHeader.RequestHeader.newBuilder().setCallId(CONNECTION_CTX_CALL_ID).build();
-    return KuduRpc.toChannelBuffer(header, pb);
-  }
-
   private String getPeerUuidLoggingString() {
     return "[Peer " + serverInfo.getUuid() + "] ";
   }

Reply via email to