Make SecureRpcHelper a Netty pipeline stage This decouples SecureRpcHelper from TabletClient. When negotiation is complete, it sends an "event" upstream to the TabletClient and removes itself from the pipeline.
I'm hoping this decoupling will make it easier to test SecureRpcHelper in isolation as we add more functionality like TLS negotiation. After this is committed, I'm also hoping to rename SecureRpcHelper to Negotiator. Change-Id: I5f4b4516219b8eebf24786b4ceb13f2e6260f03b Reviewed-on: http://gerrit.cloudera.org:8080/5927 Reviewed-by: Dan Burkert <danburk...@apache.org> Tested-by: Kudu Jenkins Reviewed-by: Jean-Daniel Cryans <jdcry...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a25162ab Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a25162ab Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a25162ab Branch: refs/heads/master Commit: a25162abb068519df3fdb44751ffd18cb9fe630e Parents: 24b0624 Author: Todd Lipcon <t...@apache.org> Authored: Tue Feb 7 02:44:47 2017 -0800 Committer: Todd Lipcon <t...@apache.org> Committed: Wed Feb 8 21:27:55 2017 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/CallResponse.java | 29 ++++- .../org/apache/kudu/client/ConnectionCache.java | 1 + .../org/apache/kudu/client/SecureRpcHelper.java | 108 ++++++++++++------- .../org/apache/kudu/client/TabletClient.java | 61 +++-------- 4 files changed, 116 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/a25162ab/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java index 3b93f60..3a9a26f 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java @@ -19,7 +19,12 @@ package org.apache.kudu.client; import java.util.List; +import com.google.protobuf.Message; + import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; import org.apache.kudu.annotations.InterfaceAudience; import org.apache.kudu.rpc.RpcHeader; @@ -48,7 +53,7 @@ final class CallResponse { * @throws IndexOutOfBoundsException if any length prefix inside the * response points outside the bounds of the buffer. */ - public CallResponse(final ChannelBuffer buf) { + private CallResponse(final ChannelBuffer buf) { this.buf = buf; this.totalResponseSize = buf.readableBytes(); @@ -74,6 +79,12 @@ final class CallResponse { return this.totalResponseSize; } + public <T extends Message> T parseResponse(T prototypeInstance) { + prototypeInstance.newBuilderForType(); + return prototypeInstance; + + } + /** * @return A slice pointing to the section of the packet reserved for the main * protobuf message. @@ -153,4 +164,20 @@ final class CallResponse { } return new Slice(payload, offset, length); } + + /** + * Netty channel handler which receives incoming frames (ChannelBuffers) + * and constructs CallResponse objects. + */ + static class Decoder extends OneToOneDecoder { + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, Object message) + throws Exception { + if (!(message instanceof ChannelBuffer)) { + return message; + } + return new CallResponse((ChannelBuffer)message); + } + } + } http://git-wip-us.apache.org/repos/asf/kudu/blob/a25162ab/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java index da2725e..aa47e5d 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java @@ -255,6 +255,7 @@ class ConnectionCache { 4, // length prefix is 4 bytes long 0, // no "length adjustment" 4 /* strip the length prefix */)); + super.addLast("decode-responses", new CallResponse.Decoder()); AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient; final TabletClient client = new TabletClient(kuduClient, serverInfo); if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) { http://git-wip-us.apache.org/repos/asf/kudu/blob/a25162ab/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java index 8c7cdee..bf2fece 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java @@ -51,15 +51,19 @@ import com.google.protobuf.ZeroCopyLiteralByteString; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.kudu.annotations.InterfaceAudience; import org.apache.kudu.rpc.RpcHeader; +import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag; @InterfaceAudience.Private -public class SecureRpcHelper { +public class SecureRpcHelper extends SimpleChannelUpstreamHandler { private static final Logger LOG = LoggerFactory.getLogger(TabletClient.class); @@ -75,22 +79,20 @@ public class SecureRpcHelper { static final String USER_AND_PASSWORD = "java_client"; - private final TabletClient client; + private boolean finished; private SaslClient saslClient; + public static final int CONNECTION_CTX_CALL_ID = -3; private static final int SASL_CALL_ID = -33; private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES = ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS); - private volatile boolean negoUnderway = true; private Set<RpcHeader.RpcFeatureFlag> serverFeatures; - public SecureRpcHelper(final TabletClient client) { - this.client = client; - } + private final Subject subject; + private final String remoteHostname; - public Set<RpcHeader.RpcFeatureFlag> getServerFeatures() { - Preconditions.checkState(!negoUnderway); - Preconditions.checkNotNull(serverFeatures); - return serverFeatures; + public SecureRpcHelper(Subject subject, String remoteHostname) { + this.subject = subject; + this.remoteHostname = remoteHostname; } public void sendHello(Channel channel) { @@ -119,30 +121,38 @@ public class SecureRpcHelper { Channels.write(channel, buffer); } - public ChannelBuffer handleResponse(ChannelBuffer buf, Channel chan) throws SaslException { - if (negoUnderway) { - RpcHeader.NegotiatePB response = parseSaslMsgResponse(buf); - switch (response.getStep()) { - case NEGOTIATE: - handleNegotiateResponse(chan, response); - break; - case SASL_CHALLENGE: - handleChallengeResponse(chan, response); - break; - case SASL_SUCCESS: - handleSuccessResponse(chan); - break; - default: - LOG.error(String.format("Wrong negotiation step: %s", response.getStep())); - } - return null; + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) + throws Exception { + Object m = evt.getMessage(); + if (!(m instanceof CallResponse)) { + ctx.sendUpstream(evt); + return; } - return buf; + handleResponse(ctx.getChannel(), (CallResponse)m); } + private void handleResponse(Channel chan, CallResponse callResponse) throws SaslException { + // TODO(todd): this needs to handle error responses, not just success responses. + Preconditions.checkState(!finished, "received a response after negotiation was complete"); + RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse); + switch (response.getStep()) { + case NEGOTIATE: + handleNegotiateResponse(chan, response); + break; + case SASL_CHALLENGE: + handleChallengeResponse(chan, response); + break; + case SASL_SUCCESS: + handleSuccessResponse(chan); + break; + default: + LOG.error(String.format("Wrong negotiation step: %s", response.getStep())); + } + } - private RpcHeader.NegotiatePB parseSaslMsgResponse(ChannelBuffer buf) { - CallResponse response = new CallResponse(buf); + + private RpcHeader.NegotiatePB parseSaslMsgResponse(CallResponse response) { RpcHeader.ResponseHeader responseHeader = response.getHeader(); int id = responseHeader.getCallId(); if (id != SASL_CALL_ID) { @@ -156,7 +166,6 @@ public class SecureRpcHelper { private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws SaslException { - Preconditions.checkNotNull(chan); // Store the supported features advertised by the server. ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder(); for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) { @@ -188,7 +197,7 @@ public class SecureRpcHelper { saslClient = Sasl.createSaslClient(new String[]{ clientMech }, null, "kudu", - client.getServerInfo().getHostname(), + remoteHostname, SASL_PROPS, SASL_CALLBACK); if (saslClient.hasInitialResponse()) { @@ -230,13 +239,28 @@ public class SecureRpcHelper { } private void handleSuccessResponse(Channel chan) { - LOG.debug("nego finished"); - negoUnderway = false; - client.sendContext(chan); + finished = true; + chan.getPipeline().remove(this); + + Channels.write(chan, makeConnectionContext()); + Channels.fireMessageReceived(chan, new NegotiationResult(serverFeatures)); + } + + private ChannelBuffer makeConnectionContext() { + RpcHeader.ConnectionContextPB.Builder builder = RpcHeader.ConnectionContextPB.newBuilder(); + + // The UserInformationPB is deprecated, but used by servers prior to Kudu 1.1. + RpcHeader.UserInformationPB.Builder userBuilder = RpcHeader.UserInformationPB.newBuilder(); + userBuilder.setEffectiveUser(SecureRpcHelper.USER_AND_PASSWORD); + userBuilder.setRealUser(SecureRpcHelper.USER_AND_PASSWORD); + builder.setDEPRECATEDUserInfo(userBuilder.build()); + RpcHeader.ConnectionContextPB pb = builder.build(); + RpcHeader.RequestHeader header = + RpcHeader.RequestHeader.newBuilder().setCallId(CONNECTION_CTX_CALL_ID).build(); + return KuduRpc.toChannelBuffer(header, pb); } private byte[] evaluateChallenge(final byte[] challenge) throws SaslException { - final Subject subject = client.getSubject(); try { return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() { @Override @@ -264,4 +288,16 @@ public class SecureRpcHelper { } } } + + /** + * The results of a successful negotiation. This is sent to upstream handlers in the + * Netty pipeline after negotiation completes. + */ + static class NegotiationResult { + final Set<RpcFeatureFlag> serverFeatures; + + public NegotiationResult(Set<RpcFeatureFlag> serverFeatures) { + this.serverFeatures = serverFeatures; + } + } } http://git-wip-us.apache.org/repos/asf/kudu/blob/a25162ab/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java index f17e5a1..33ec662 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java @@ -34,7 +34,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; import javax.security.auth.Subject; -import javax.security.sasl.SaslException; import com.google.common.annotations.VisibleForTesting; import com.stumbleupon.async.Deferred; @@ -57,6 +56,7 @@ import org.slf4j.LoggerFactory; import org.apache.kudu.WireProtocol; import org.apache.kudu.annotations.InterfaceAudience; +import org.apache.kudu.client.SecureRpcHelper.NegotiationResult; import org.apache.kudu.master.Master; import org.apache.kudu.rpc.RpcHeader; import org.apache.kudu.tserver.Tserver; @@ -95,7 +95,6 @@ public class TabletClient extends SimpleChannelUpstreamHandler { 0, 0 }; - public static final int CONNECTION_CTX_CALL_ID = -3; /** * A monotonically increasing counter for RPC IDs. @@ -136,8 +135,6 @@ public class TabletClient extends SimpleChannelUpstreamHandler { private final long socketReadTimeoutMs; - private SecureRpcHelper secureRpcHelper; - private final RequestTracker requestTracker; private final ServerInfo serverInfo; @@ -146,6 +143,8 @@ public class TabletClient extends SimpleChannelUpstreamHandler { // differently by also clearing the caches. private volatile boolean gotUncaughtException = false; + private NegotiationResult negotiationResult; + public TabletClient(AsyncKuduClient client, ServerInfo serverInfo) { this.kuduClient = client; this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs(); @@ -167,7 +166,7 @@ public class TabletClient extends SimpleChannelUpstreamHandler { Pair<ChannelBuffer, Integer> encodedRpcAndId = null; if (chan != null) { if (!rpc.getRequiredFeatures().isEmpty() && - !secureRpcHelper.getServerFeatures().contains( + !negotiationResult.serverFeatures.contains( RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS)) { Status statusNotSupported = Status.NotSupported("the server does not support the" + "APPLICATION_FEATURE_FLAGS RPC feature"); @@ -379,29 +378,18 @@ public class TabletClient extends SimpleChannelUpstreamHandler { @SuppressWarnings("unchecked") public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception { Object m = evt.getMessage(); - if (!(m instanceof ChannelBuffer)) { - ctx.sendUpstream(evt); + if (m instanceof SecureRpcHelper.NegotiationResult) { + this.negotiationResult = (NegotiationResult) m; + this.chan = ctx.getChannel(); + sendQueuedRpcs(); return; } - Channel chan = ctx.getChannel(); - ChannelBuffer buf = (ChannelBuffer)m; - final long start = System.nanoTime(); - final int rdx = buf.readerIndex(); - LOG.debug("------------------>> ENTERING DECODE >>------------------"); - - try { - buf = secureRpcHelper.handleResponse(buf, chan); - } catch (SaslException e) { - String message = getPeerUuidLoggingString() + "Couldn't complete the SASL handshake"; - LOG.error(message); - Status statusIOE = Status.IOError(message); - throw new NonRecoverableException(statusIOE, e); - } - if (buf == null) { + if (!(m instanceof CallResponse)) { + ctx.sendUpstream(evt); return; } - - CallResponse response = new CallResponse(buf); + CallResponse response = (CallResponse)m; + final long start = System.nanoTime(); RpcHeader.ResponseHeader header = response.getHeader(); if (!header.hasCallId()) { @@ -465,7 +453,7 @@ public class TabletClient extends SimpleChannelUpstreamHandler { } if (LOG.isDebugEnabled()) { LOG.debug(getPeerUuidLoggingString() + "rpcid=" + rpcid + - ", response size=" + (buf.readerIndex() - rdx) + " bytes" + + ", response size=" + response.getTotalResponseSize() + ", rpc=" + rpc); } @@ -615,7 +603,8 @@ public class TabletClient extends SimpleChannelUpstreamHandler { final Channel chan = e.getChannel(); Channels.write(chan, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER)); - secureRpcHelper = new SecureRpcHelper(this); + SecureRpcHelper secureRpcHelper = new SecureRpcHelper(getSubject(), serverInfo.getHostname()); + ctx.getPipeline().addBefore(ctx.getName(), "negotiation", secureRpcHelper); secureRpcHelper.sendHello(chan); } @@ -775,26 +764,6 @@ public class TabletClient extends SimpleChannelUpstreamHandler { } } - void sendContext(Channel channel) { - Channels.write(channel, header()); - this.chan = channel; - sendQueuedRpcs(); - } - - private ChannelBuffer header() { - RpcHeader.ConnectionContextPB.Builder builder = RpcHeader.ConnectionContextPB.newBuilder(); - - // The UserInformationPB is deprecated, but used by servers prior to Kudu 1.1. - RpcHeader.UserInformationPB.Builder userBuilder = RpcHeader.UserInformationPB.newBuilder(); - userBuilder.setEffectiveUser(SecureRpcHelper.USER_AND_PASSWORD); - userBuilder.setRealUser(SecureRpcHelper.USER_AND_PASSWORD); - builder.setDEPRECATEDUserInfo(userBuilder.build()); - RpcHeader.ConnectionContextPB pb = builder.build(); - RpcHeader.RequestHeader header = - RpcHeader.RequestHeader.newBuilder().setCallId(CONNECTION_CTX_CALL_ID).build(); - return KuduRpc.toChannelBuffer(header, pb); - } - private String getPeerUuidLoggingString() { return "[Peer " + serverInfo.getUuid() + "] "; }