HBASE-15407 Add SASL support for fan out OutputStream

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6ea49945
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6ea49945
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6ea49945

Branch: refs/heads/hbase-12439
Commit: 6ea4994569e05ff44e0fa571e053cef828ab57ed
Parents: e450d94
Author: zhangduo <zhang...@apache.org>
Authored: Sun Mar 27 19:01:05 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri Apr 8 21:46:47 2016 +0800

----------------------------------------------------------------------
 .../util/FanOutOneBlockAsyncDFSOutput.java      |   38 +-
 .../FanOutOneBlockAsyncDFSOutputHelper.java     |  230 ++--
 .../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 1032 ++++++++++++++++++
 .../util/TestFanOutOneBlockAsyncDFSOutput.java  |   13 +-
 .../TestSaslFanOutOneBlockAsyncDFSOutput.java   |  192 ++++
 5 files changed, 1385 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6ea49945/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java
index b10f180..bdbf865 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java
@@ -17,11 +17,26 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import static io.netty.handler.timeout.IdleState.READER_IDLE;
+import static io.netty.handler.timeout.IdleState.WRITER_IDLE;
 import static 
org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
 import static 
org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
 import static 
org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
 import static 
org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoop;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -36,6 +51,8 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Supplier;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -52,23 +69,6 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.base.Supplier;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoop;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
-
 /**
  * An asynchronous HDFS output stream implementation which fans out data to 
datanode and only
  * supports writing file with only one block.
@@ -278,7 +278,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
Closeable {
       public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
         if (evt instanceof IdleStateEvent) {
           IdleStateEvent e = (IdleStateEvent) evt;
-          if (e.state() == IdleState.READER_IDLE) {
+          if (e.state() == READER_IDLE) {
             failed(ctx.channel(), new Supplier<Throwable>() {
 
               @Override
@@ -286,7 +286,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
Closeable {
                 return new IOException("Timeout(" + timeoutMs + "ms) waiting 
for response");
               }
             });
-          } else if (e.state() == IdleState.WRITER_IDLE) {
+          } else if (e.state() == WRITER_IDLE) {
             PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, 
false, 0, false);
             int len = heartbeat.getSerializedSize();
             ByteBuf buf = alloc.buffer(len);

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ea49945/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
index 32fe48b..2225191 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -18,12 +18,36 @@
 package org.apache.hadoop.hbase.util;
 
 import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static io.netty.handler.timeout.IdleState.READER_IDLE;
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static 
org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
 import static 
org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoop;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
@@ -33,6 +57,10 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.CodedOutputStream;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -68,46 +96,21 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.CodedOutputStream;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoop;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
-
 /**
  * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
  */
 @InterfaceAudience.Private
-public class FanOutOneBlockAsyncDFSOutputHelper {
+public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static final Log LOG = 
LogFactory.getLog(FanOutOneBlockAsyncDFSOutputHelper.class);
 
@@ -167,6 +170,7 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
   // This is used to terminate a recoverFileLease call when FileSystem is 
already closed.
   // isClientRunning is not public so we need to use reflection.
   private interface DFSClientAdaptor {
+
     boolean isClientRunning(DFSClient client);
   }
 
@@ -174,14 +178,14 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static DFSClientAdaptor createDFSClientAdaptor() {
     try {
-      final Method method = 
DFSClient.class.getDeclaredMethod("isClientRunning");
-      method.setAccessible(true);
+      final Method isClientRunningMethod = 
DFSClient.class.getDeclaredMethod("isClientRunning");
+      isClientRunningMethod.setAccessible(true);
       return new DFSClientAdaptor() {
 
         @Override
         public boolean isClientRunning(DFSClient client) {
           try {
-            return (Boolean) method.invoke(client);
+            return (Boolean) isClientRunningMethod.invoke(client);
           } catch (IllegalAccessException | InvocationTargetException e) {
             throw new RuntimeException(e);
           }
@@ -194,11 +198,11 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static LeaseManager createLeaseManager() {
     try {
-      final Method beginFileLeaseMethod = 
DFSClient.class.getDeclaredMethod("beginFileLease",
-        long.class, DFSOutputStream.class);
+      final Method beginFileLeaseMethod =
+          DFSClient.class.getDeclaredMethod("beginFileLease", long.class, 
DFSOutputStream.class);
       beginFileLeaseMethod.setAccessible(true);
-      final Method endFileLeaseMethod = 
DFSClient.class.getDeclaredMethod("endFileLease",
-        long.class);
+      final Method endFileLeaseMethod =
+          DFSClient.class.getDeclaredMethod("endFileLease", long.class);
       endFileLeaseMethod.setAccessible(true);
       return new LeaseManager() {
 
@@ -224,11 +228,11 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
       LOG.warn("No inodeId related lease methods found, should be hadoop 
2.4-", e);
     }
     try {
-      final Method beginFileLeaseMethod = 
DFSClient.class.getDeclaredMethod("beginFileLease",
-        String.class, DFSOutputStream.class);
+      final Method beginFileLeaseMethod =
+          DFSClient.class.getDeclaredMethod("beginFileLease", String.class, 
DFSOutputStream.class);
       beginFileLeaseMethod.setAccessible(true);
-      final Method endFileLeaseMethod = 
DFSClient.class.getDeclaredMethod("endFileLease",
-        String.class);
+      final Method endFileLeaseMethod =
+          DFSClient.class.getDeclaredMethod("endFileLease", String.class);
       endFileLeaseMethod.setAccessible(true);
       return new LeaseManager() {
 
@@ -261,18 +265,19 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
       @SuppressWarnings("rawtypes")
       Class<? extends Enum> ecnClass;
       try {
-        ecnClass = 
Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
-            .asSubclass(Enum.class);
+        ecnClass =
+            
Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
+                .asSubclass(Enum.class);
       } catch (ClassNotFoundException e) {
         throw new Error(e);
       }
       @SuppressWarnings("unchecked")
       final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
       final Method getReplyMethod = 
PipelineAckProto.class.getMethod("getReply", int.class);
-      final Method combineHeaderMethod = 
PipelineAck.class.getMethod("combineHeader", ecnClass,
-        Status.class);
-      final Method getStatusFromHeaderMethod = 
PipelineAck.class.getMethod("getStatusFromHeader",
-        int.class);
+      final Method combineHeaderMethod =
+          PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
+      final Method getStatusFromHeaderMethod =
+          PipelineAck.class.getMethod("getStatusFromHeader", int.class);
       return new PipelineAckStatusGetter() {
 
         @Override
@@ -317,8 +322,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
   private static StorageTypeSetter createStorageTypeSetter() {
     final Method setStorageTypeMethod;
     try {
-      setStorageTypeMethod = 
OpWriteBlockProto.Builder.class.getMethod("setStorageType",
-        StorageTypeProto.class);
+      setStorageTypeMethod =
+          OpWriteBlockProto.Builder.class.getMethod("setStorageType", 
StorageTypeProto.class);
     } catch (NoSuchMethodException e) {
       LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e);
       return new StorageTypeSetter() {
@@ -362,8 +367,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
                 String clientName, EnumSetWritable<CreateFlag> flag, boolean 
createParent,
                 short replication, long blockSize) throws IOException {
               try {
-                return (HdfsFileStatus) createMethod.invoke(namenode, src, 
masked, clientName, flag,
-                  createParent, replication, blockSize);
+                return (HdfsFileStatus) createMethod.invoke(namenode, src, 
masked, clientName,
+                  flag, createParent, replication, blockSize);
               } catch (IllegalAccessException e) {
                 throw new RuntimeException(e);
               } catch (InvocationTargetException e) {
@@ -374,16 +379,16 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
           };
         } else {
           try {
-            Class<?> cryptoProtocolVersionClass = Class
-                .forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
+            Class<?> cryptoProtocolVersionClass =
+                
Class.forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
             Method supportedMethod = 
cryptoProtocolVersionClass.getMethod("supported");
             final Object supported = supportedMethod.invoke(null);
             return new FileCreater() {
 
               @Override
-              public HdfsFileStatus create(ClientProtocol namenode, String 
src, FsPermission masked,
-                  String clientName, EnumSetWritable<CreateFlag> flag, boolean 
createParent,
-                  short replication, long blockSize) throws IOException {
+              public HdfsFileStatus create(ClientProtocol namenode, String src,
+                  FsPermission masked, String clientName, 
EnumSetWritable<CreateFlag> flag,
+                  boolean createParent, short replication, long blockSize) 
throws IOException {
                 try {
                   return (HdfsFileStatus) createMethod.invoke(namenode, src, 
masked, clientName,
                     flag, createParent, replication, blockSize, supported);
@@ -481,8 +486,12 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
           }
           // success
           ChannelPipeline p = ctx.pipeline();
-          while (p.first() != null) {
-            p.removeFirst();
+          for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
+            // do not remove all handlers because we may have wrap or unwrap 
handlers at the header
+            // of pipeline.
+            if (handler instanceof IdleStateHandler) {
+              break;
+            }
           }
           // Disable auto read here. Enable it after we setup the streaming 
pipeline in
           // FanOutOneBLockAsyncDFSOutput.
@@ -497,8 +506,7 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
 
         @Override
         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
-          if (evt instanceof IdleStateEvent
-              && ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
+          if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() 
== READER_IDLE) {
             promise
                 .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) 
waiting for response"));
           } else {
@@ -515,39 +523,64 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static void requestWriteBlock(Channel channel, Enum<?> storageType,
       OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
-    // TODO: SASL negotiation. should be done using a netty Handler.
     OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, 
storageType).build();
     int protoLen = proto.getSerializedSize();
-    ByteBuf buffer = channel.alloc()
-        .buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + 
protoLen);
+    ByteBuf buffer =
+        channel.alloc().buffer(3 + 
CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
     buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
     buffer.writeByte(Op.WRITE_BLOCK.code);
     proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
     channel.writeAndFlush(buffer);
   }
 
-  private static List<Future<Channel>> connectToDataNodes(Configuration conf, 
String clientName,
-      LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, 
BlockConstructionStage stage,
-      DataChecksum summer, EventLoop eventLoop) {
+  private static void initialize(Configuration conf, final Channel channel,
+      final DatanodeInfo dnInfo, final Enum<?> storageType,
+      final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int 
timeoutMs,
+      DFSClient client, Token<BlockTokenIdentifier> accessToken, final 
Promise<Channel> promise) {
+    Promise<Void> saslPromise = channel.eventLoop().newPromise();
+    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, 
saslPromise);
+    saslPromise.addListener(new FutureListener<Void>() {
+
+      @Override
+      public void operationComplete(Future<Void> future) throws Exception {
+        if (future.isSuccess()) {
+          // setup response processing pipeline first, then send request.
+          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
+          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
+        } else {
+          promise.tryFailure(future.cause());
+        }
+      }
+    });
+  }
+
+  private static List<Future<Channel>> connectToDataNodes(final Configuration 
conf,
+      final DFSClient client, String clientName, final LocatedBlock 
locatedBlock,
+      long maxBytesRcvd, long latestGS, BlockConstructionStage stage, 
DataChecksum summer,
+      EventLoop eventLoop) {
     Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
     DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
-    boolean connectToDnViaHostname = 
conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
-      DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
-    final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
-      HdfsServerConstants.READ_TIMEOUT);
+    boolean connectToDnViaHostname =
+        conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, 
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+    final int timeoutMs =
+        conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 
HdfsServerConstants.READ_TIMEOUT);
     ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
     blockCopy.setNumBytes(locatedBlock.getBlockSize());
-    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
-        
.setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy))
-            .setToken(PBHelper.convert(locatedBlock.getBlockToken())))
-        .setClientName(clientName).build();
+    ClientOperationHeaderProto header =
+        ClientOperationHeaderProto
+            .newBuilder()
+            .setBaseHeader(
+              
BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy))
+                  .setToken(PBHelper.convert(locatedBlock.getBlockToken())))
+            .setClientName(clientName).build();
     ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
-    final OpWriteBlockProto.Builder writeBlockProtoBuilder = 
OpWriteBlockProto.newBuilder()
-        
.setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
-        
.setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
-        .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
-        .setRequestedChecksum(checksumProto)
-        
.setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
+    final OpWriteBlockProto.Builder writeBlockProtoBuilder =
+        OpWriteBlockProto.newBuilder().setHeader(header)
+            
.setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
+            
.setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
+            .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
+            .setRequestedChecksum(checksumProto)
+            
.setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
     List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
     for (int i = 0; i < datanodeInfos.length; i++) {
       final DatanodeInfo dnInfo = datanodeInfos[i];
@@ -562,14 +595,17 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
 
             @Override
             protected void initChannel(Channel ch) throws Exception {
-              processWriteBlockResponse(ch, dnInfo, promise, timeoutMs);
+              // we need to get the remote address of the channel so we can 
only move on after
+              // channel connected. Leave an empty implementation here because 
netty does not allow
+              // a null handler.
             }
           }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new 
ChannelFutureListener() {
 
             @Override
             public void operationComplete(ChannelFuture future) throws 
Exception {
               if (future.isSuccess()) {
-                requestWriteBlock(future.channel(), storageType, 
writeBlockProtoBuilder);
+                initialize(conf, future.channel(), dnInfo, storageType, 
writeBlockProtoBuilder,
+                  timeoutMs, client, locatedBlock.getBlockToken(), promise);
               } else {
                 promise.tryFailure(future.cause());
               }
@@ -601,11 +637,14 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
     ClientProtocol namenode = client.getNamenode();
     HdfsFileStatus stat;
     try {
-      stat = FILE_CREATER.create(namenode, src,
-        FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), 
clientName,
-        new EnumSetWritable<CreateFlag>(
-            overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
-        createParent, replication, blockSize);
+      stat =
+          FILE_CREATER.create(
+            namenode,
+            src,
+            
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)),
+            clientName,
+            new EnumSetWritable<CreateFlag>(overwrite ? EnumSet.of(CREATE, 
OVERWRITE) : EnumSet
+                .of(CREATE)), createParent, replication, blockSize);
     } catch (Exception e) {
       if (e instanceof RemoteException) {
         throw (RemoteException) e;
@@ -619,19 +658,20 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
     List<Future<Channel>> futureList = null;
     try {
       DataChecksum summer = createChecksum(client);
-      locatedBlock = namenode.addBlock(src, client.getClientName(), null, 
null, stat.getFileId(),
-        null);
+      locatedBlock =
+          namenode.addBlock(src, client.getClientName(), null, null, 
stat.getFileId(), null);
       List<Channel> datanodeList = new ArrayList<>();
-      futureList = connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, 
PIPELINE_SETUP_CREATE,
-        summer, eventLoop);
+      futureList =
+          connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, 
PIPELINE_SETUP_CREATE,
+            summer, eventLoop);
       for (Future<Channel> future : futureList) {
         // fail the creation if there are connection failures since we are 
fail-fast. The upper
         // layer should retry itself if needed.
         datanodeList.add(future.syncUninterruptibly().getNow());
       }
       succ = true;
-      return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, 
namenode, clientName, src,
-          stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, 
ALLOC);
+      return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, 
namenode, clientName,
+          src, stat.getFileId(), locatedBlock, eventLoop, datanodeList, 
summer, ALLOC);
     } finally {
       if (!succ) {
         if (futureList != null) {
@@ -664,8 +704,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
     return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
 
       @Override
-      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
-          throws IOException, UnresolvedLinkException {
+      public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException,
+          UnresolvedLinkException {
         return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, 
replication,
           blockSize, eventLoop);
       }
@@ -684,8 +724,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
     return e.getClassName().endsWith("RetryStartFileException");
   }
 
-  static void completeFile(DFSClient client, ClientProtocol namenode, String 
src, String clientName,
-      ExtendedBlock block, long fileId) {
+  static void completeFile(DFSClient client, ClientProtocol namenode, String 
src,
+      String clientName, ExtendedBlock block, long fileId) {
     for (int retry = 0;; retry++) {
       try {
         if (namenode.complete(src, clientName, block, fileId)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ea49945/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java
new file mode 100644
index 0000000..341d4ec
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -0,0 +1,1032 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static io.netty.handler.timeout.IdleState.READER_IDLE;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder;
+import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Helper class for adding sasl support for {@link 
FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
+
+  private static final Log LOG = 
LogFactory.getLog(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
+
+  private FanOutOneBlockAsyncDFSOutputSaslHelper() {
+  }
+
+  private static final String SERVER_NAME = "0";
+  private static final String PROTOCOL = "hdfs";
+  private static final String MECHANISM = "DIGEST-MD5";
+  private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+  private static final String NAME_DELIMITER = " ";
+  private static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
+      "dfs.encrypt.data.transfer.cipher.suites";
+  private static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
+
+  private interface SaslAdaptor {
+
+    SaslPropertiesResolver getSaslPropsResolver(DFSClient client);
+
+    TrustedChannelResolver getTrustedChannelResolver(DFSClient client);
+
+    AtomicBoolean getFallbackToSimpleAuth(DFSClient client);
+
+    DataEncryptionKey createDataEncryptionKey(DFSClient client);
+  }
+
+  private static final SaslAdaptor SASL_ADAPTOR;
+
+  private interface CipherHelper {
+
+    List<Object> getCipherOptions(Configuration conf) throws IOException;
+
+    void addCipherOptions(DataTransferEncryptorMessageProto.Builder builder,
+        List<Object> cipherOptions);
+
+    Object getCipherOption(DataTransferEncryptorMessageProto proto, boolean 
isNegotiatedQopPrivacy,
+        SaslClient saslClient) throws IOException;
+
+    Object getCipherSuite(Object cipherOption);
+
+    byte[] getInKey(Object cipherOption);
+
+    byte[] getInIv(Object cipherOption);
+
+    byte[] getOutKey(Object cipherOption);
+
+    byte[] getOutIv(Object cipherOption);
+  }
+
+  private static final CipherHelper CIPHER_HELPER;
+
+  private static final class CryptoCodec {
+
+    private static final Method CREATE_CODEC;
+
+    private static final Method CREATE_ENCRYPTOR;
+
+    private static final Method CREATE_DECRYPTOR;
+
+    private static final Method INIT_ENCRYPTOR;
+
+    private static final Method INIT_DECRYPTOR;
+
+    private static final Method ENCRYPT;
+
+    private static final Method DECRYPT;
+
+    static {
+      Class<?> cryptoCodecClass = null;
+      try {
+        cryptoCodecClass = 
Class.forName("org.apache.hadoop.crypto.CryptoCodec");
+      } catch (ClassNotFoundException e) {
+        LOG.warn("No CryptoCodec class found, should be hadoop 2.5-", e);
+      }
+      if (cryptoCodecClass != null) {
+        Method getInstanceMethod = null;
+        for (Method method : cryptoCodecClass.getMethods()) {
+          if (method.getName().equals("getInstance") && 
method.getParameterTypes().length == 2) {
+            getInstanceMethod = method;
+            break;
+          }
+        }
+        CREATE_CODEC = getInstanceMethod;
+        try {
+          CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor");
+          CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor");
+
+          Class<?> encryptorClass = 
Class.forName("org.apache.hadoop.crypto.Encryptor");
+          INIT_ENCRYPTOR = encryptorClass.getMethod("init");
+          ENCRYPT = encryptorClass.getMethod("encrypt", ByteBuffer.class, 
ByteBuffer.class);
+
+          Class<?> decryptorClass = 
Class.forName("org.apache.hadoop.crypto.Decryptor");
+          INIT_DECRYPTOR = decryptorClass.getMethod("init");
+          DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, 
ByteBuffer.class);
+        } catch (NoSuchMethodException | ClassNotFoundException e) {
+          throw new Error(e);
+        }
+      } else {
+        LOG.warn("Can not initialize CryptoCodec, should be hadoop 2.5-");
+        CREATE_CODEC = null;
+        CREATE_ENCRYPTOR = null;
+        CREATE_DECRYPTOR = null;
+        INIT_ENCRYPTOR = null;
+        INIT_DECRYPTOR = null;
+        ENCRYPT = null;
+        DECRYPT = null;
+      }
+    }
+
+    private final Object encryptor;
+
+    private final Object decryptor;
+
+    public CryptoCodec(Configuration conf, Object cipherOption) {
+      Object codec;
+      try {
+        codec = CREATE_CODEC.invoke(null, conf, 
CIPHER_HELPER.getCipherSuite(cipherOption));
+        encryptor = CREATE_ENCRYPTOR.invoke(codec);
+        byte[] encKey = CIPHER_HELPER.getInKey(cipherOption);
+        byte[] encIv = CIPHER_HELPER.getInIv(cipherOption);
+        INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, 
encIv.length));
+
+        decryptor = CREATE_DECRYPTOR.invoke(codec);
+        byte[] decKey = CIPHER_HELPER.getOutKey(cipherOption);
+        byte[] decIv = CIPHER_HELPER.getOutIv(cipherOption);
+        INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, 
decIv.length));
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
+      try {
+        ENCRYPT.invoke(encryptor, inBuffer, outBuffer);
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
+      try {
+        DECRYPT.invoke(decryptor, inBuffer, outBuffer);
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static SaslAdaptor createSaslAdaptor27(Class<?> 
saslDataTransferClientClass)
+      throws NoSuchFieldException, NoSuchMethodException {
+    final Field saslPropsResolverField =
+        saslDataTransferClientClass.getDeclaredField("saslPropsResolver");
+    saslPropsResolverField.setAccessible(true);
+    final Field trustedChannelResolverField =
+        saslDataTransferClientClass.getDeclaredField("trustedChannelResolver");
+    trustedChannelResolverField.setAccessible(true);
+    final Field fallbackToSimpleAuthField =
+        saslDataTransferClientClass.getDeclaredField("fallbackToSimpleAuth");
+    fallbackToSimpleAuthField.setAccessible(true);
+    final Method getSaslDataTransferClientMethod =
+        DFSClient.class.getMethod("getSaslDataTransferClient");
+    final Method newDataEncryptionKeyMethod = 
DFSClient.class.getMethod("newDataEncryptionKey");
+    return new SaslAdaptor() {
+
+      @Override
+      public TrustedChannelResolver getTrustedChannelResolver(DFSClient 
client) {
+        try {
+          return (TrustedChannelResolver) trustedChannelResolverField
+              .get(getSaslDataTransferClientMethod.invoke(client));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
+        try {
+          return (SaslPropertiesResolver) saslPropsResolverField
+              .get(getSaslDataTransferClientMethod.invoke(client));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
+        try {
+          return (AtomicBoolean) 
fallbackToSimpleAuthField.get(getSaslDataTransferClientMethod
+              .invoke(client));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
+        try {
+          return (DataEncryptionKey) newDataEncryptionKeyMethod.invoke(client);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static SaslAdaptor createSaslAdaptor25() {
+    try {
+      final Field trustedChannelResolverField =
+          DFSClient.class.getDeclaredField("trustedChannelResolver");
+      trustedChannelResolverField.setAccessible(true);
+      final Method getDataEncryptionKeyMethod = 
DFSClient.class.getMethod("getDataEncryptionKey");
+      return new SaslAdaptor() {
+
+        @Override
+        public TrustedChannelResolver getTrustedChannelResolver(DFSClient 
client) {
+          try {
+            return (TrustedChannelResolver) 
trustedChannelResolverField.get(client);
+          } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+        @Override
+        public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
+          return null;
+        }
+
+        @Override
+        public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
+          return null;
+        }
+
+        @Override
+        public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
+          try {
+            return (DataEncryptionKey) 
getDataEncryptionKeyMethod.invoke(client);
+          } catch (IllegalAccessException | InvocationTargetException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      };
+    } catch (NoSuchFieldException | NoSuchMethodException e) {
+      throw new Error(e);
+    }
+
+  }
+
+  private static SaslAdaptor createSaslAdaptor() {
+    Class<?> saslDataTransferClientClass = null;
+    try {
+      saslDataTransferClientClass =
+          
Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient");
+    } catch (ClassNotFoundException e) {
+      LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-");
+    }
+    try {
+      return saslDataTransferClientClass != null ? 
createSaslAdaptor27(saslDataTransferClientClass)
+          : createSaslAdaptor25();
+    } catch (NoSuchFieldException | NoSuchMethodException e) {
+      throw new Error(e);
+    }
+  }
+
+  private static CipherHelper createCipherHelper25() {
+    return new CipherHelper() {
+
+      @Override
+      public byte[] getOutKey(Object cipherOption) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public byte[] getOutIv(Object cipherOption) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public byte[] getInKey(Object cipherOption) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public byte[] getInIv(Object cipherOption) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Object getCipherSuite(Object cipherOption) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public List<Object> getCipherOptions(Configuration conf) {
+        return null;
+      }
+
+      @Override
+      public Object getCipherOption(DataTransferEncryptorMessageProto proto,
+          boolean isNegotiatedQopPrivacy, SaslClient saslClient) {
+        return null;
+      }
+
+      @Override
+      public void addCipherOptions(Builder builder, List<Object> 
cipherOptions) {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  private static CipherHelper createCipherHelper27(Class<?> cipherOptionClass)
+      throws ClassNotFoundException, NoSuchMethodException {
+    @SuppressWarnings("rawtypes")
+    Class<? extends Enum> cipherSuiteClass =
+        
Class.forName("org.apache.hadoop.crypto.CipherSuite").asSubclass(Enum.class);
+    @SuppressWarnings("unchecked")
+    final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, 
"AES_CTR_NOPADDING");
+    final Constructor<?> cipherOptionConstructor =
+        cipherOptionClass.getConstructor(cipherSuiteClass);
+    final Constructor<?> cipherOptionWithKeyAndIvConstructor =
+        cipherOptionClass.getConstructor(cipherSuiteClass, byte[].class, 
byte[].class,
+          byte[].class, byte[].class);
+
+    final Method getCipherSuiteMethod = 
cipherOptionClass.getMethod("getCipherSuite");
+    final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey");
+    final Method getInIvMethod = cipherOptionClass.getMethod("getInIv");
+    final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
+    final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
+
+    final Method convertCipherOptionsMethod =
+        PBHelper.class.getMethod("convertCipherOptions", List.class);
+    final Method convertCipherOptionProtosMethod =
+        PBHelper.class.getMethod("convertCipherOptionProtos", List.class);
+    final Method addAllCipherOptionMethod =
+        
DataTransferEncryptorMessageProto.Builder.class.getMethod("addAllCipherOption",
+          Iterable.class);
+    final Method getCipherOptionListMethod =
+        
DataTransferEncryptorMessageProto.class.getMethod("getCipherOptionList");
+    return new CipherHelper() {
+
+      @Override
+      public byte[] getOutKey(Object cipherOption) {
+        try {
+          return (byte[]) getOutKeyMethod.invoke(cipherOption);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public byte[] getOutIv(Object cipherOption) {
+        try {
+          return (byte[]) getOutIvMethod.invoke(cipherOption);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public byte[] getInKey(Object cipherOption) {
+        try {
+          return (byte[]) getInKeyMethod.invoke(cipherOption);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public byte[] getInIv(Object cipherOption) {
+        try {
+          return (byte[]) getInIvMethod.invoke(cipherOption);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public Object getCipherSuite(Object cipherOption) {
+        try {
+          return getCipherSuiteMethod.invoke(cipherOption);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public List<Object> getCipherOptions(Configuration conf) throws 
IOException {
+        // Negotiate cipher suites if configured. Currently, the only supported
+        // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+        // values for future expansion.
+        String cipherSuites = 
conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+        if (cipherSuites == null || cipherSuites.isEmpty()) {
+          return null;
+        }
+        if (!cipherSuites.equals(AES_CTR_NOPADDING)) {
+          throw new IOException(String.format("Invalid cipher suite, %s=%s",
+            DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+        }
+        Object option;
+        try {
+          option = cipherOptionConstructor.newInstance(aesCipherSuite);
+        } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+        List<Object> cipherOptions = Lists.newArrayListWithCapacity(1);
+        cipherOptions.add(option);
+        return cipherOptions;
+      }
+
+      private Object unwrap(Object option, SaslClient saslClient) throws 
IOException {
+        byte[] inKey = getInKey(option);
+        if (inKey != null) {
+          inKey = saslClient.unwrap(inKey, 0, inKey.length);
+        }
+        byte[] outKey = getOutKey(option);
+        if (outKey != null) {
+          outKey = saslClient.unwrap(outKey, 0, outKey.length);
+        }
+        try {
+          return 
cipherOptionWithKeyAndIvConstructor.newInstance(getCipherSuite(option), inKey,
+            getInIv(option), outKey, getOutIv(option));
+        } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public Object getCipherOption(DataTransferEncryptorMessageProto proto,
+          boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws 
IOException {
+        List<Object> cipherOptions;
+        try {
+          cipherOptions =
+              (List<Object>) convertCipherOptionProtosMethod.invoke(null,
+                getCipherOptionListMethod.invoke(proto));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+        if (cipherOptions == null || cipherOptions.isEmpty()) {
+          return null;
+        }
+        Object cipherOption = cipherOptions.get(0);
+        return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : 
cipherOption;
+      }
+
+      @Override
+      public void addCipherOptions(Builder builder, List<Object> 
cipherOptions) {
+        try {
+          addAllCipherOptionMethod.invoke(builder,
+            convertCipherOptionsMethod.invoke(null, cipherOptions));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static CipherHelper createCipherHelper() {
+    Class<?> cipherOptionClass;
+    try {
+      cipherOptionClass = 
Class.forName("org.apache.hadoop.crypto.CipherOption");
+    } catch (ClassNotFoundException e) {
+      LOG.warn("No CipherOption class found, should be hadoop 2.5-");
+      return createCipherHelper25();
+    }
+    try {
+      return createCipherHelper27(cipherOptionClass);
+    } catch (NoSuchMethodException | ClassNotFoundException e) {
+      throw new Error(e);
+    }
+  }
+
+  static {
+    SASL_ADAPTOR = createSaslAdaptor();
+    CIPHER_HELPER = createCipherHelper();
+  }
+
+  /**
+   * Sets user name and password when asked by the client-side SASL object.
+   */
+  private static final class SaslClientCallbackHandler implements 
CallbackHandler {
+
+    private final char[] password;
+    private final String userName;
+
+    /**
+     * Creates a new SaslClientCallbackHandler.
+     * @param userName SASL user name
+     * @Param password SASL password
+     */
+    public SaslClientCallbackHandler(String userName, char[] password) {
+      this.password = password;
+      this.userName = userName;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, 
UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback, "Unrecognized SASL 
client callback");
+        }
+      }
+      if (nc != null) {
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        pc.setPassword(password);
+      }
+      if (rc != null) {
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+
+  private static final class SaslNegotiateHandler extends ChannelDuplexHandler 
{
+
+    private final Configuration conf;
+
+    private final Map<String, String> saslProps;
+
+    private final SaslClient saslClient;
+
+    private final int timeoutMs;
+
+    private final Promise<Void> promise;
+
+    private int step = 0;
+
+    public SaslNegotiateHandler(Configuration conf, String username, char[] 
password,
+        Map<String, String> saslProps, int timeoutMs, Promise<Void> promise) 
throws SaslException {
+      this.conf = conf;
+      this.saslProps = saslProps;
+      this.saslClient =
+          Sasl.createSaslClient(new String[] { MECHANISM }, username, 
PROTOCOL, SERVER_NAME,
+            saslProps, new SaslClientCallbackHandler(username, password));
+      this.timeoutMs = timeoutMs;
+      this.promise = promise;
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) 
throws IOException {
+      sendSaslMessage(ctx, payload, null);
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, 
List<Object> options)
+        throws IOException {
+      DataTransferEncryptorMessageProto.Builder builder =
+          DataTransferEncryptorMessageProto.newBuilder();
+      builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+      if (payload != null) {
+        builder.setPayload(ByteString.copyFrom(payload));
+      }
+      if (options != null) {
+        CIPHER_HELPER.addCipherOptions(builder, options);
+      }
+      DataTransferEncryptorMessageProto proto = builder.build();
+      int size = proto.getSerializedSize();
+      size += CodedOutputStream.computeRawVarint32Size(size);
+      ByteBuf buf = ctx.alloc().buffer(size);
+      proto.writeDelimitedTo(new ByteBufOutputStream(buf));
+      ctx.write(buf);
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
+      sendSaslMessage(ctx, new byte[0]);
+      ctx.flush();
+      step++;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      saslClient.dispose();
+    }
+
+    private void check(DataTransferEncryptorMessageProto proto) throws 
IOException {
+      if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+        throw new InvalidEncryptionKeyException(proto.getMessage());
+      } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+        throw new IOException(proto.getMessage());
+      }
+    }
+
+    private String getNegotiatedQop() {
+      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    }
+
+    private boolean isNegotiatedQopPrivacy() {
+      String qop = getNegotiatedQop();
+      return qop != null && "auth-conf".equalsIgnoreCase(qop);
+    }
+
+    private boolean requestedQopContainsPrivacy() {
+      Set<String> requestedQop =
+          
ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      return requestedQop.contains("auth-conf");
+    }
+
+    private void checkSaslComplete() throws IOException {
+      if (!saslClient.isComplete()) {
+        throw new IOException("Failed to complete SASL handshake");
+      }
+      Set<String> requestedQop =
+          
ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      String negotiatedQop = getNegotiatedQop();
+      LOG.debug("Verifying QOP, requested QOP = " + requestedQop + ", 
negotiated QOP = "
+          + negotiatedQop);
+      if (!requestedQop.contains(negotiatedQop)) {
+        throw new IOException(String.format("SASL handshake completed, but "
+            + "channel does not have acceptable quality of protection, "
+            + "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
+      }
+    }
+
+    private boolean useWrap() {
+      String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+      return qop != null && !"auth".equalsIgnoreCase(qop);
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
IOException {
+      if (msg instanceof DataTransferEncryptorMessageProto) {
+        DataTransferEncryptorMessageProto proto = 
(DataTransferEncryptorMessageProto) msg;
+        check(proto);
+        byte[] challenge = proto.getPayload().toByteArray();
+        byte[] response = saslClient.evaluateChallenge(challenge);
+        switch (step) {
+          case 1: {
+            List<Object> cipherOptions = null;
+            if (requestedQopContainsPrivacy()) {
+              cipherOptions = CIPHER_HELPER.getCipherOptions(conf);
+            }
+            sendSaslMessage(ctx, response, cipherOptions);
+            ctx.flush();
+            step++;
+            break;
+          }
+          case 2: {
+            assert response == null;
+            checkSaslComplete();
+            Object cipherOption =
+                CIPHER_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), 
saslClient);
+            ChannelPipeline p = ctx.pipeline();
+            while (p.first() != null) {
+              p.removeFirst();
+            }
+            if (cipherOption != null) {
+              CryptoCodec codec = new CryptoCodec(conf, cipherOption);
+              p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
+            } else {
+              if (useWrap()) {
+                p.addLast(new SaslWrapHandler(saslClient), new 
LengthFieldBasedFrameDecoder(
+                    Integer.MAX_VALUE, 0, 4), new 
SaslUnwrapHandler(saslClient));
+              }
+            }
+            promise.trySuccess(null);
+            break;
+          }
+          default:
+            throw new IllegalArgumentException("Unrecognized negotiation step: 
" + step);
+        }
+      } else {
+        ctx.fireChannelRead(msg);
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+      promise.tryFailure(cause);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
+      if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == 
READER_IDLE) {
+        promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) 
waiting for response"));
+      } else {
+        super.userEventTriggered(ctx, evt);
+      }
+    }
+  }
+
+  private static final class SaslUnwrapHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
+
+    private final SaslClient saslClient;
+
+    public SaslUnwrapHandler(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      saslClient.dispose();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
+      msg.skipBytes(4);
+      byte[] b = new byte[msg.readableBytes()];
+      msg.readBytes(b);
+      ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, 
b.length)));
+    }
+  }
+
+  private static final class SaslWrapHandler extends 
ChannelOutboundHandlerAdapter {
+
+    private final SaslClient saslClient;
+
+    private CompositeByteBuf cBuf;
+
+    public SaslWrapHandler(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise)
+        throws Exception {
+      if (msg instanceof ByteBuf) {
+        ByteBuf buf = (ByteBuf) msg;
+        cBuf.addComponent(buf);
+        cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
+      } else {
+        ctx.write(msg);
+      }
+    }
+
+    @Override
+    public void flush(ChannelHandlerContext ctx) throws Exception {
+      if (cBuf.isReadable()) {
+        byte[] b = new byte[cBuf.readableBytes()];
+        cBuf.readBytes(b);
+        cBuf.discardReadComponents();
+        byte[] wrapped = saslClient.wrap(b, 0, b.length);
+        ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
+        buf.writeInt(wrapped.length);
+        buf.writeBytes(wrapped);
+        ctx.write(buf);
+      }
+      ctx.flush();
+    }
+
+    @Override
+    public void close(ChannelHandlerContext ctx, ChannelPromise promise) 
throws Exception {
+      cBuf.release();
+      cBuf = null;
+    }
+  }
+
+  private static final class DecryptHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
+
+    private final CryptoCodec codec;
+
+    public DecryptHandler(CryptoCodec codec) {
+      this.codec = codec;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
+      ByteBuf inBuf;
+      boolean release = false;
+      if (msg.nioBufferCount() == 1) {
+        inBuf = msg;
+      } else {
+        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+        msg.readBytes(inBuf);
+        release = true;
+      }
+      ByteBuffer inBuffer = inBuf.nioBuffer();
+      ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
+      ByteBuffer outBuffer = outBuf.nioBuffer();
+      codec.decrypt(inBuffer, outBuffer);
+      outBuf.writerIndex(inBuf.readableBytes());
+      if (release) {
+        inBuf.release();
+      }
+      ctx.fireChannelRead(outBuf);
+    }
+  }
+
+  private static final class EncryptHandler extends 
MessageToByteEncoder<ByteBuf> {
+
+    private final CryptoCodec codec;
+
+    public EncryptHandler(CryptoCodec codec) {
+      super(false);
+      this.codec = codec;
+    }
+
+    @Override
+    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, 
boolean preferDirect)
+        throws Exception {
+      if (preferDirect) {
+        return ctx.alloc().directBuffer(msg.readableBytes());
+      } else {
+        return ctx.alloc().buffer(msg.readableBytes());
+      }
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) 
throws Exception {
+      ByteBuf inBuf;
+      boolean release = false;
+      if (msg.nioBufferCount() == 1) {
+        inBuf = msg;
+      } else {
+        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+        msg.readBytes(inBuf);
+        release = true;
+      }
+      ByteBuffer inBuffer = inBuf.nioBuffer();
+      ByteBuffer outBuffer = out.nioBuffer();
+      codec.encrypt(inBuffer, outBuffer);
+      out.writerIndex(inBuf.readableBytes());
+      if (release) {
+        inBuf.release();
+      }
+    }
+  }
+
+  private static String getUserNameFromEncryptionKey(DataEncryptionKey 
encryptionKey) {
+    return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + 
NAME_DELIMITER
+        + new String(Base64.encodeBase64(encryptionKey.nonce, false), 
Charsets.UTF_8);
+  }
+
+  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+    return new String(Base64.encodeBase64(encryptionKey, false), 
Charsets.UTF_8).toCharArray();
+  }
+
+  private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), 
Charsets.UTF_8);
+  }
+
+  private static char[] buildClientPassword(Token<BlockTokenIdentifier> 
blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getPassword(), false), 
Charsets.UTF_8)
+        .toCharArray();
+  }
+
+  private static Map<String, String> createSaslPropertiesForEncryption(String 
encryptionAlgorithm) {
+    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
+    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+    return saslProps;
+  }
+
+  private static void doSaslNegotiation(Configuration conf, Channel channel, 
int timeoutMs,
+      String username, char[] password, Map<String, String> saslProps, 
Promise<Void> saslPromise) {
+    try {
+      channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, 
TimeUnit.MILLISECONDS),
+        new ProtobufVarint32FrameDecoder(),
+        new 
ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
+        new SaslNegotiateHandler(conf, username, password, saslProps, 
timeoutMs, saslPromise));
+    } catch (SaslException e) {
+      saslPromise.tryFailure(e);
+    }
+  }
+
+  static void trySaslNegotiate(Configuration conf, Channel channel, 
DatanodeInfo dnInfo,
+      int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
+      Promise<Void> saslPromise) {
+    SaslPropertiesResolver saslPropsResolver = 
SASL_ADAPTOR.getSaslPropsResolver(client);
+    TrustedChannelResolver trustedChannelResolver = 
SASL_ADAPTOR.getTrustedChannelResolver(client);
+    AtomicBoolean fallbackToSimpleAuth = 
SASL_ADAPTOR.getFallbackToSimpleAuth(client);
+    InetAddress addr = ((InetSocketAddress) 
channel.remoteAddress()).getAddress();
+    if (trustedChannelResolver.isTrusted() || 
trustedChannelResolver.isTrusted(addr)) {
+      saslPromise.trySuccess(null);
+      return;
+    }
+    DataEncryptionKey encryptionKey;
+    try {
+      encryptionKey = SASL_ADAPTOR.createDataEncryptionKey(client);
+    } catch (Exception e) {
+      saslPromise.tryFailure(e);
+      return;
+    }
+    if (encryptionKey != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client doing encrypted handshake for addr = " + addr + 
", datanodeId = "
+            + dnInfo);
+      }
+      doSaslNegotiation(conf, channel, timeoutMs, 
getUserNameFromEncryptionKey(encryptionKey),
+        encryptionKeyToPassword(encryptionKey.encryptionKey),
+        createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), 
saslPromise);
+    } else if (!UserGroupInformation.isSecurityEnabled()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in unsecured configuration 
for addr = " + addr
+            + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (dnInfo.getXferPort() < 1024) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration 
with "
+            + "privileged port for addr = " + addr + ", datanodeId = " + 
dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration 
with "
+            + "unsecured cluster for addr = " + addr + ", datanodeId = " + 
dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (saslPropsResolver != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client doing general handshake for addr = " + addr + 
", datanodeId = "
+            + dnInfo);
+      }
+      doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
+        buildClientPassword(accessToken), 
saslPropsResolver.getClientProperties(addr), saslPromise);
+    } else {
+      // It's a secured cluster using non-privileged ports, but no SASL. The 
only way this can
+      // happen is if the DataNode has ignore.secure.ports.for.testing 
configured, so this is a rare
+      // edge case.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration 
with no SASL "
+            + "protection configured for addr = " + addr + ", datanodeId = " + 
dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ea49945/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java
index 09cd61e..a10712e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java
@@ -102,8 +102,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     }
   }
 
-  private void writeAndVerify(EventLoop eventLoop, Path f, final 
FanOutOneBlockAsyncDFSOutput out)
-      throws IOException, InterruptedException, ExecutionException {
+  static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, 
Path f,
+      final FanOutOneBlockAsyncDFSOutput out)
+          throws IOException, InterruptedException, ExecutionException {
     final byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
     final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new 
FanOutOneBlockAsyncDFSOutputFlushHandler();
@@ -117,9 +118,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     });
     assertEquals(b.length, handler.get());
     out.close();
-    assertEquals(b.length, FS.getFileStatus(f).getLen());
+    assertEquals(b.length, dfs.getFileStatus(f).getLen());
     byte[] actual = new byte[b.length];
-    try (FSDataInputStream in = FS.open(f)) {
+    try (FSDataInputStream in = dfs.open(f)) {
       in.readFully(actual);
     }
     assertArrayEquals(b, actual);
@@ -131,7 +132,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
       true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
-    writeAndVerify(eventLoop, f, out);
+    writeAndVerify(eventLoop, FS, f, out);
   }
 
   @Test
@@ -191,7 +192,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
       true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
     Thread.sleep(READ_TIMEOUT_MS * 2);
     // the connection to datanode should still alive.
-    writeAndVerify(eventLoop, f, out);
+    writeAndVerify(eventLoop, FS, f, out);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ea49945/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java
new file mode 100644
index 0000000..2f5e2ff
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+@RunWith(Parameterized.class)
+@Category({ MiscTests.class, MediumTests.class })
+public class TestSaslFanOutOneBlockAsyncDFSOutput {
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static DistributedFileSystem FS;
+
+  private static EventLoopGroup EVENT_LOOP_GROUP;
+
+  private static int READ_TIMEOUT_MS = 200000;
+
+  private static final File KEYTAB_FILE = new File(
+      TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
+
+  private static MiniKdc KDC;
+
+  private static String HOST = "localhost";
+
+  private static String USERNAME;
+
+  private static String PRINCIPAL;
+
+  private static String HTTP_PRINCIPAL;
+  @Rule
+  public TestName name = new TestName();
+
+  @Parameter(0)
+  public String protection;
+
+  @Parameter(1)
+  public String encryptionAlgorithm;
+
+  @Parameters(name = "{index}: protection={0}, encryption={1}")
+  public static Iterable<Object[]> data() {
+    List<Object[]> params = new ArrayList<>();
+    for (String protection : Arrays.asList("authentication", "integrity", 
"privacy")) {
+      for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
+        params.add(new Object[] { protection, encryptionAlgorithm });
+      }
+    }
+    return params;
+  }
+
+  private static void setHdfsSecuredConfiguration(Configuration conf) throws 
Exception {
+    // change XXX_USER_NAME_KEY to XXX_KERBEROS_PRINCIPAL_KEY after we drop 
support for hadoop-2.4.1
+    conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, PRINCIPAL + "@" + 
KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, 
KEYTAB_FILE.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, PRINCIPAL + "@" + 
KDC.getRealm());
+    conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, 
KEYTAB_FILE.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+      HTTP_PRINCIPAL + "@" + KDC.getRealm());
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, 
HttpConfig.Policy.HTTPS_ONLY.name());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+
+    File keystoresDir = new 
File(TEST_UTIL.getDataTestDir("keystore").toUri().getPath());
+    keystoresDir.mkdirs();
+    String sslConfDir = 
KeyStoreTestUtil.getClasspathDir(TestGenerateDelegationToken.class);
+    KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), 
sslConfDir, conf, false);
+
+    conf.setBoolean("ignore.secure.ports.for.testing", true);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    
Logger.getLogger("org.apache.hadoop.hdfs.StateChange").setLevel(Level.DEBUG);
+    Logger.getLogger("BlockStateChange").setLevel(Level.DEBUG);
+    EVENT_LOOP_GROUP = new NioEventLoopGroup();
+    TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 
READ_TIMEOUT_MS);
+    Properties conf = MiniKdc.createConf();
+    conf.put(MiniKdc.DEBUG, true);
+    KDC = new MiniKdc(conf, new 
File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
+    KDC.start();
+    USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
+    PRINCIPAL = USERNAME + "/" + HOST;
+    HTTP_PRINCIPAL = "HTTP/" + HOST;
+    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
+    setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration());
+    HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
+    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + 
KDC.getRealm());
+    HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration());
+    UserGroupInformation.setConfiguration(TEST_UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws IOException, 
InterruptedException {
+    if (EVENT_LOOP_GROUP != null) {
+      EVENT_LOOP_GROUP.shutdownGracefully().sync();
+    }
+    if (KDC != null) {
+      KDC.stop();
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", 
protection);
+    if (StringUtils.isBlank(encryptionAlgorithm)) {
+      TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, 
false);
+      TEST_UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
+    } else {
+      TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, 
true);
+      TEST_UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, 
encryptionAlgorithm);
+    }
+    TEST_UTIL.startMiniDFSCluster(3);
+    FS = TEST_UTIL.getDFSCluster().getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    TEST_UTIL.shutdownMiniDFSCluster();
+  }
+
+  private Path getTestFile() {
+    return new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", 
"_"));
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException, 
ExecutionException {
+    Path f = getTestFile();
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
+      true, false, (short) 1, FS.getDefaultBlockSize(), eventLoop);
+    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out);
+  }
+}

Reply via email to