Repository: hbase
Updated Branches:
  refs/heads/master 5cdaca5c0 -> 341223d86


http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
new file mode 100644
index 0000000..50a1a6b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
+import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.SaslStatus;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import 
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.htrace.TraceInfo;
+
+/** Reads calls from a connection and queues them for handling. */
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"VO_VOLATILE_INCREMENT",
+    justification = "False positive according to 
http://sourceforge.net/p/findbugs/bugs/1032/";)
+@InterfaceAudience.Private
+class SimpleServerRpcConnection extends ServerRpcConnection {
+
+  final SocketChannel channel;
+  private ByteBuff data;
+  private ByteBuffer dataLengthBuffer;
+  protected final ConcurrentLinkedDeque<SimpleServerCall> responseQueue =
+      new ConcurrentLinkedDeque<>();
+  final Lock responseWriteLock = new ReentrantLock();
+  private final LongAdder rpcCount = new LongAdder(); // number of outstanding 
rpcs
+  private long lastContact;
+  private final Socket socket;
+  private final SimpleRpcServerResponder responder;
+
+  public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel 
channel,
+      long lastContact) {
+    super(rpcServer);
+    this.channel = channel;
+    this.lastContact = lastContact;
+    this.data = null;
+    this.dataLengthBuffer = ByteBuffer.allocate(4);
+    this.socket = channel.socket();
+    this.addr = socket.getInetAddress();
+    if (addr == null) {
+      this.hostAddress = "*Unknown*";
+    } else {
+      this.hostAddress = addr.getHostAddress();
+    }
+    this.remotePort = socket.getPort();
+    if (rpcServer.socketSendBufferSize != 0) {
+      try {
+        socket.setSendBufferSize(rpcServer.socketSendBufferSize);
+      } catch (IOException e) {
+        SimpleRpcServer.LOG.warn(
+          "Connection: unable to set socket send buffer size to " + 
rpcServer.socketSendBufferSize);
+      }
+    }
+    this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, 
null, this, 0, null,
+        null, System.currentTimeMillis(), 0, rpcServer.reservoir, 
rpcServer.cellBlockBuilder, null,
+        rpcServer.responder);
+    this.setConnectionHeaderResponseCall = new 
SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
+        null, null, null, null, null, this, 0, null, null, 
System.currentTimeMillis(), 0,
+        rpcServer.reservoir, rpcServer.cellBlockBuilder, null, 
rpcServer.responder);
+    this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, 
null, null, null, null,
+        null, this, 0, null, null, System.currentTimeMillis(), 0, 
rpcServer.reservoir,
+        rpcServer.cellBlockBuilder, null, rpcServer.responder);
+    this.responder = rpcServer.responder;
+  }
+
+  public void setLastContact(long lastContact) {
+    this.lastContact = lastContact;
+  }
+
+  public long getLastContact() {
+    return lastContact;
+  }
+
+  /* Return true if the connection has no outstanding rpc */
+  boolean isIdle() {
+    return rpcCount.sum() == 0;
+  }
+
+  /* Decrement the outstanding RPC count */
+  protected void decRpcCount() {
+    rpcCount.decrement();
+  }
+
+  /* Increment the outstanding RPC count */
+  protected void incRpcCount() {
+    rpcCount.increment();
+  }
+
+  private int readPreamble() throws IOException {
+    int count;
+    // Check for 'HBas' magic.
+    this.dataLengthBuffer.flip();
+    if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
+      return doBadPreambleHandling(
+        "Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) + " 
but received HEADER=" +
+            Bytes.toStringBinary(dataLengthBuffer.array()) + " from " + 
toString());
+    }
+    // Now read the next two bytes, the version and the auth to use.
+    ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
+    count = this.rpcServer.channelRead(channel, versionAndAuthBytes);
+    if (count < 0 || versionAndAuthBytes.remaining() > 0) {
+      return count;
+    }
+    int version = versionAndAuthBytes.get(0);
+    byte authbyte = versionAndAuthBytes.get(1);
+    this.authMethod = AuthMethod.valueOf(authbyte);
+    if (version != SimpleRpcServer.CURRENT_VERSION) {
+      String msg = getFatalConnectionString(version, authbyte);
+      return doBadPreambleHandling(msg, new WrongVersionException(msg));
+    }
+    if (authMethod == null) {
+      String msg = getFatalConnectionString(version, authbyte);
+      return doBadPreambleHandling(msg, new BadAuthException(msg));
+    }
+    if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+      if (this.rpcServer.allowFallbackToSimpleAuth) {
+        this.rpcServer.metrics.authenticationFallback();
+        authenticatedWithFallback = true;
+      } else {
+        AccessDeniedException ae = new AccessDeniedException("Authentication 
is required");
+        this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, 
ae.getMessage());
+        authFailedCall.sendResponseIfReady();
+        throw ae;
+      }
+    }
+    if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+      doRawSaslReply(SaslStatus.SUCCESS, new 
IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
+        null);
+      authMethod = AuthMethod.SIMPLE;
+      // client has already sent the initial Sasl message and we
+      // should ignore it. Both client and server should fall back
+      // to simple auth from now on.
+      skipInitialSaslHandshake = true;
+    }
+    if (authMethod != AuthMethod.SIMPLE) {
+      useSasl = true;
+    }
+
+    dataLengthBuffer.clear();
+    connectionPreambleRead = true;
+    return count;
+  }
+
+  private int read4Bytes() throws IOException {
+    if (this.dataLengthBuffer.remaining() > 0) {
+      return this.rpcServer.channelRead(channel, this.dataLengthBuffer);
+    } else {
+      return 0;
+    }
+  }
+
+  /**
+   * Read off the wire. If there is not enough data to read, update the 
connection state with what
+   * we have and returns.
+   * @return Returns -1 if failure (and caller will close connection), else 
zero or more.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public int readAndProcess() throws IOException, InterruptedException {
+    // Try and read in an int. If new connection, the int will hold the 'HBas' 
HEADER. If it
+    // does, read in the rest of the connection preamble, the version and the 
auth method.
+    // Else it will be length of the data to read (or -1 if a ping). We catch 
the integer
+    // length into the 4-byte this.dataLengthBuffer.
+    int count = read4Bytes();
+    if (count < 0 || dataLengthBuffer.remaining() > 0) {
+      return count;
+    }
+
+    // If we have not read the connection setup preamble, look to see if that 
is on the wire.
+    if (!connectionPreambleRead) {
+      count = readPreamble();
+      if (!connectionPreambleRead) {
+        return count;
+      }
+
+      count = read4Bytes();
+      if (count < 0 || dataLengthBuffer.remaining() > 0) {
+        return count;
+      }
+    }
+
+    // We have read a length and we have read the preamble. It is either the 
connection header
+    // or it is a request.
+    if (data == null) {
+      dataLengthBuffer.flip();
+      int dataLength = dataLengthBuffer.getInt();
+      if (dataLength == RpcClient.PING_CALL_ID) {
+        if (!useWrap) { // covers the !useSasl too
+          dataLengthBuffer.clear();
+          return 0; // ping message
+        }
+      }
+      if (dataLength < 0) { // A data length of zero is legal.
+        throw new DoNotRetryIOException(
+            "Unexpected data length " + dataLength + "!! from " + 
getHostAddress());
+      }
+
+      if (dataLength > this.rpcServer.maxRequestSize) {
+        String msg = "RPC data length of " + dataLength + " received from " + 
getHostAddress() +
+            " is greater than max allowed " + this.rpcServer.maxRequestSize + 
". Set \"" +
+            SimpleRpcServer.MAX_REQUEST_SIZE +
+            "\" on server to override this limit (not recommended)";
+        SimpleRpcServer.LOG.warn(msg);
+
+        if (connectionHeaderRead && connectionPreambleRead) {
+          incRpcCount();
+          // Construct InputStream for the non-blocking SocketChannel
+          // We need the InputStream because we want to read only the request 
header
+          // instead of the whole rpc.
+          ByteBuffer buf = ByteBuffer.allocate(1);
+          InputStream is = new InputStream() {
+            @Override
+            public int read() throws IOException {
+              SimpleServerRpcConnection.this.rpcServer.channelRead(channel, 
buf);
+              buf.flip();
+              int x = buf.get();
+              buf.flip();
+              return x;
+            }
+          };
+          CodedInputStream cis = CodedInputStream.newInstance(is);
+          int headerSize = cis.readRawVarint32();
+          Message.Builder builder = RequestHeader.newBuilder();
+          ProtobufUtil.mergeFrom(builder, cis, headerSize);
+          RequestHeader header = (RequestHeader) builder.build();
+
+          // Notify the client about the offending request
+          SimpleServerCall reqTooBig = new 
SimpleServerCall(header.getCallId(), this.service, null,
+              null, null, null, this, 0, null, this.addr, 
System.currentTimeMillis(), 0,
+              this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, 
responder);
+          
this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
+          // Make sure the client recognizes the underlying exception
+          // Otherwise, throw a DoNotRetryIOException.
+          if 
(VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
+            RequestTooBigException.MAJOR_VERSION, 
RequestTooBigException.MINOR_VERSION)) {
+            this.rpcServer.setupResponse(null, reqTooBig, 
SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION,
+              msg);
+          } else {
+            this.rpcServer.setupResponse(null, reqTooBig, new 
DoNotRetryIOException(), msg);
+          }
+          // We are going to close the connection, make sure we process the 
response
+          // before that. In rare case when this fails, we still close the 
connection.
+          responseWriteLock.lock();
+          try {
+            this.responder.processResponse(reqTooBig);
+          } finally {
+            responseWriteLock.unlock();
+          }
+        }
+        // Close the connection
+        return -1;
+      }
+
+      // Initialize this.data with a ByteBuff.
+      // This call will allocate a ByteBuff to read request into and assign to 
this.data
+      // Also when we use some buffer(s) from pool, it will create a 
CallCleanup instance also and
+      // assign to this.callCleanup
+      initByteBuffToReadInto(dataLength);
+
+      // Increment the rpc count. This counter will be decreased when we write
+      // the response. If we want the connection to be detected as idle 
properly, we
+      // need to keep the inc / dec correct.
+      incRpcCount();
+    }
+
+    count = channelDataRead(channel, data);
+
+    if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
+      process();
+    }
+
+    return count;
+  }
+
+  // It creates the ByteBuff and CallCleanup and assign to Connection instance.
+  private void initByteBuffToReadInto(int length) {
+    // We create random on heap buffers are read into those when
+    // 1. ByteBufferPool is not there.
+    // 2. When the size of the req is very small. Using a large sized (64 KB) 
buffer from pool is
+    // waste then. Also if all the reqs are of this size, we will be creating 
larger sized
+    // buffers and pool them permanently. This include Scan/Get request and 
DDL kind of reqs like
+    // RegionOpen.
+    // 3. If it is an initial handshake signal or initial connection request. 
Any way then
+    // condition 2 itself will match
+    // 4. When SASL use is ON.
+    if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || 
!connectionHeaderRead ||
+        useSasl || length < this.rpcServer.minSizeForReservoirUse) {
+      this.data = new SingleByteBuff(ByteBuffer.allocate(length));
+    } else {
+      Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(
+        this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, 
length);
+      this.data = pair.getFirst();
+      this.callCleanup = pair.getSecond();
+    }
+  }
+
+  protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) 
throws IOException {
+    int count = buf.read(channel);
+    if (count > 0) {
+      this.rpcServer.metrics.receivedBytes(count);
+    }
+    return count;
+  }
+
+  /**
+   * Process the data buffer and clean the connection state for the next call.
+   */
+  private void process() throws IOException, InterruptedException {
+    data.rewind();
+    try {
+      if (skipInitialSaslHandshake) {
+        skipInitialSaslHandshake = false;
+        return;
+      }
+
+      if (useSasl) {
+        saslReadAndProcess(data);
+      } else {
+        processOneRpc(data);
+      }
+
+    } finally {
+      dataLengthBuffer.clear(); // Clean for the next call
+      data = null; // For the GC
+      this.callCleanup = null;
+    }
+  }
+
+  private int doBadPreambleHandling(final String msg) throws IOException {
+    return doBadPreambleHandling(msg, new FatalConnectionException(msg));
+  }
+
+  private int doBadPreambleHandling(final String msg, final Exception e) 
throws IOException {
+    SimpleRpcServer.LOG.warn(msg);
+    SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, 
null, null, this, -1,
+        null, null, System.currentTimeMillis(), 0, this.rpcServer.reservoir,
+        this.rpcServer.cellBlockBuilder, null, responder);
+    this.rpcServer.setupResponse(null, fakeCall, e, msg);
+    this.responder.doRespond(fakeCall);
+    // Returning -1 closes out the connection.
+    return -1;
+  }
+
+  @Override
+  public synchronized void close() {
+    disposeSasl();
+    data = null;
+    callCleanup = null;
+    if (!channel.isOpen()) return;
+    try {
+      socket.shutdownOutput();
+    } catch (Exception ignored) {
+      if (SimpleRpcServer.LOG.isTraceEnabled()) {
+        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
+      }
+    }
+    if (channel.isOpen()) {
+      try {
+        channel.close();
+      } catch (Exception ignored) {
+      }
+    }
+    try {
+      socket.close();
+    } catch (Exception ignored) {
+      if (SimpleRpcServer.LOG.isTraceEnabled()) {
+        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
+      }
+    }
+  }
+
+  @Override
+  public boolean isConnectionOpen() {
+    return channel.isOpen();
+  }
+
+  @Override
+  public SimpleServerCall createCall(int id, BlockingService service, 
MethodDescriptor md,
+      RequestHeader header, Message param, CellScanner cellScanner, long size, 
TraceInfo tinfo,
+      InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
+    return new SimpleServerCall(id, service, md, header, param, cellScanner, 
this, size, tinfo,
+        remoteAddress, System.currentTimeMillis(), timeout, 
this.rpcServer.reservoir,
+        this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
index cfb0e02..eb325ad 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.Locale;
 import java.util.Map;
+import java.util.function.Consumer;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -34,14 +35,13 @@ import javax.security.sasl.RealmCallback;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
 
 /**
  * A utility class for dealing with SASL on RPC server
@@ -79,13 +79,12 @@ public class HBaseSaslRpcServer {
   /** CallbackHandler for SASL DIGEST-MD5 mechanism */
   public static class SaslDigestCallbackHandler implements CallbackHandler {
     private SecretManager<TokenIdentifier> secretManager;
-    private RpcServer.Connection connection;
+    private Consumer<UserGroupInformation> attemptingUserConsumer;
 
-    public SaslDigestCallbackHandler(
-        SecretManager<TokenIdentifier> secretManager,
-        RpcServer.Connection connection) {
+    public SaslDigestCallbackHandler(SecretManager<TokenIdentifier> 
secretManager,
+        Consumer<UserGroupInformation> attemptingUserConsumer) {
       this.secretManager = secretManager;
-      this.connection = connection;
+      this.attemptingUserConsumer = attemptingUserConsumer;
     }
 
     private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
@@ -116,12 +115,11 @@ public class HBaseSaslRpcServer {
       if (pc != null) {
         TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), 
secretManager);
         char[] password = getPassword(tokenIdentifier);
-        UserGroupInformation user = null;
-        user = tokenIdentifier.getUser(); // may throw exception
-        connection.attemptingUser = user;
+        UserGroupInformation user = tokenIdentifier.getUser(); // may throw 
exception
+        attemptingUserConsumer.accept(user);
         if (LOG.isTraceEnabled()) {
-          LOG.trace("SASL server DIGEST-MD5 callback: setting password "
-              + "for client: " + tokenIdentifier.getUser());
+          LOG.trace("SASL server DIGEST-MD5 callback: setting password " + 
"for client: " +
+              tokenIdentifier.getUser());
         }
         pc.setPassword(password);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 581e50e..2bd750e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -315,9 +315,10 @@ public abstract class AbstractTestIPC {
           new InetSocketAddress("localhost", 0), conf, scheduler);
     }
 
-    class FailingConnection extends Connection {
-      public FailingConnection(SocketChannel channel, long lastContact) {
-        super(channel, lastContact);
+    class FailingConnection extends SimpleServerRpcConnection {
+      public FailingConnection(TestFailingRpcServer rpcServer, SocketChannel 
channel,
+          long lastContact) {
+        super(rpcServer, channel, lastContact);
       }
 
       @Override
@@ -329,8 +330,8 @@ public abstract class AbstractTestIPC {
     }
 
     @Override
-    protected Connection getConnection(SocketChannel channel, long time) {
-      return new FailingConnection(channel, time);
+    protected SimpleServerRpcConnection getConnection(SocketChannel channel, 
long time) {
+      return new FailingConnection(this, channel, time);
     }
   }
 

Reply via email to