Revert "HDFS-8934. Move ShortCircuitShm to hdfs-client. Contributed by 
Mingliang Liu."

This reverts commit 8e4afa3a671583c95263218b85cf6bfbc1e43635.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a727c6db
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a727c6db
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a727c6db

Branch: refs/heads/branch-2
Commit: a727c6db0530aff5dcccb4181ba83e93e543ac5c
Parents: 87d0133
Author: Andrew Wang <w...@apache.org>
Authored: Mon Aug 24 11:51:46 2015 -0700
Committer: Andrew Wang <w...@apache.org>
Committed: Mon Aug 24 11:51:46 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/ExtendedBlockId.java |  82 ---
 .../org/apache/hadoop/hdfs/net/DomainPeer.java  | 132 ----
 .../java/org/apache/hadoop/hdfs/net/Peer.java   | 123 ----
 .../datatransfer/BlockConstructionStage.java    |  62 --
 .../datatransfer/DataTransferProtoUtil.java     | 146 -----
 .../datatransfer/DataTransferProtocol.java      | 202 ------
 .../hadoop/hdfs/protocol/datatransfer/Op.java   |  66 --
 .../hdfs/protocol/datatransfer/Sender.java      | 261 --------
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  | 254 --------
 .../token/block/InvalidBlockTokenException.java |  41 --
 .../hdfs/server/datanode/CachingStrategy.java   |  76 ---
 .../hadoop/hdfs/shortcircuit/DfsClientShm.java  | 119 ----
 .../hdfs/shortcircuit/DfsClientShmManager.java  | 522 ---------------
 .../hdfs/shortcircuit/ShortCircuitShm.java      | 647 -------------------
 .../hadoop/hdfs/util/ExactSizeInputStream.java  | 125 ----
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 -
 .../apache/hadoop/hdfs/BlockReaderFactory.java  |   4 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  10 +-
 .../org/apache/hadoop/hdfs/DataStreamer.java    |   6 +-
 .../org/apache/hadoop/hdfs/ExtendedBlockId.java |  82 +++
 .../apache/hadoop/hdfs/RemoteBlockReader.java   |   4 +-
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  |   4 +-
 .../org/apache/hadoop/hdfs/net/DomainPeer.java  | 132 ++++
 .../java/org/apache/hadoop/hdfs/net/Peer.java   | 123 ++++
 .../datatransfer/BlockConstructionStage.java    |  62 ++
 .../datatransfer/DataTransferProtoUtil.java     | 148 +++++
 .../datatransfer/DataTransferProtocol.java      | 201 ++++++
 .../hadoop/hdfs/protocol/datatransfer/Op.java   |  66 ++
 .../hdfs/protocol/datatransfer/PipelineAck.java |   2 +-
 .../hdfs/protocol/datatransfer/Receiver.java    |   7 +-
 .../hdfs/protocol/datatransfer/Sender.java      | 261 ++++++++
 .../datatransfer/sasl/DataTransferSaslUtil.java |   2 +-
 ...tDatanodeProtocolServerSideTranslatorPB.java |   2 +-
 .../ClientDatanodeProtocolTranslatorPB.java     |   6 +-
 ...tNamenodeProtocolServerSideTranslatorPB.java |   6 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  28 +-
 .../DatanodeProtocolClientSideTranslatorPB.java |   4 +-
 .../InterDatanodeProtocolTranslatorPB.java      |   2 +-
 .../NamenodeProtocolTranslatorPB.java           |   2 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 228 ++++++-
 .../token/block/InvalidBlockTokenException.java |  41 ++
 .../hadoop/hdfs/server/balancer/Dispatcher.java |   2 +-
 .../hdfs/server/datanode/CachingStrategy.java   |  76 +++
 .../hadoop/hdfs/server/datanode/DataNode.java   |   4 +-
 .../hdfs/server/datanode/DataXceiver.java       |  14 +-
 .../server/namenode/FSImageFormatPBINode.java   |   5 +-
 .../hadoop/hdfs/shortcircuit/DfsClientShm.java  | 119 ++++
 .../hdfs/shortcircuit/DfsClientShmManager.java  | 514 +++++++++++++++
 .../hdfs/shortcircuit/ShortCircuitCache.java    |   4 +-
 .../hdfs/shortcircuit/ShortCircuitShm.java      | 646 ++++++++++++++++++
 .../hadoop/hdfs/util/ExactSizeInputStream.java  | 125 ++++
 .../hadoop/hdfs/protocolPB/TestPBHelper.java    |  20 +-
 52 files changed, 2873 insertions(+), 2949 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
deleted file mode 100644
index 7b9e8e3..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-
-/**
- * An immutable key which identifies a block.
- */
-@InterfaceAudience.Private
-final public class ExtendedBlockId {
-  /**
-   * The block ID for this block.
-   */
-  private final long blockId;
-
-  /**
-   * The block pool ID for this block.
-   */
-  private final String bpId;
-
-  public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) {
-    return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
-  }
-  
-  public ExtendedBlockId(long blockId, String bpId) {
-    this.blockId = blockId;
-    this.bpId = bpId;
-  }
-
-  public long getBlockId() {
-    return this.blockId;
-  }
-
-  public String getBlockPoolId() {
-    return this.bpId;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if ((o == null) || (o.getClass() != this.getClass())) {
-      return false;
-    }
-    ExtendedBlockId other = (ExtendedBlockId)o;
-    return new EqualsBuilder().
-        append(blockId, other.blockId).
-        append(bpId, other.bpId).
-        isEquals();
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().
-        append(this.blockId).
-        append(this.bpId).
-        toHashCode();
-  }
-
-  @Override
-  public String toString() {
-    return new StringBuilder().append(blockId).
-        append("_").append(bpId).toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
deleted file mode 100644
index 4792b0e..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.net;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.channels.ReadableByteChannel;
-
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * Represents a peer that we communicate with by using blocking I/O 
- * on a UNIX domain socket.
- */
-@InterfaceAudience.Private
-public class DomainPeer implements Peer {
-  private final DomainSocket socket;
-  private final OutputStream out;
-  private final InputStream in;
-  private final ReadableByteChannel channel;
-
-  public DomainPeer(DomainSocket socket) {
-    this.socket = socket;
-    this.out = socket.getOutputStream();
-    this.in = socket.getInputStream();
-    this.channel = socket.getChannel();
-  }
-
-  @Override
-  public ReadableByteChannel getInputStreamChannel() {
-    return channel;
-  }
-
-  @Override
-  public void setReadTimeout(int timeoutMs) throws IOException {
-    socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs);
-  }
-
-  @Override
-  public int getReceiveBufferSize() throws IOException {
-    return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
-  }
-
-  @Override
-  public boolean getTcpNoDelay() throws IOException {
-    /* No TCP, no TCP_NODELAY. */
-    return false;
-  }
-
-  @Override
-  public void setWriteTimeout(int timeoutMs) throws IOException {
-    socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs);
-  }
-
-  @Override
-  public boolean isClosed() {
-    return !socket.isOpen();
-  }
-
-  @Override
-  public void close() throws IOException {
-    socket.close();
-  }
-
-  @Override
-  public String getRemoteAddressString() {
-    return "unix:" + socket.getPath();
-  }
-
-  @Override
-  public String getLocalAddressString() {
-    return "<local>";
-  }
-
-  @Override
-  public InputStream getInputStream() throws IOException {
-    return in;
-  }
-
-  @Override
-  public OutputStream getOutputStream() throws IOException {
-    return out;
-  }
-
-  @Override
-  public boolean isLocal() {
-    /* UNIX domain sockets can only be used for local communication. */
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return "DomainPeer(" + getRemoteAddressString() + ")";
-  }
-
-  @Override
-  public DomainSocket getDomainSocket() {
-    return socket;
-  }
-
-  @Override
-  public boolean hasSecureChannel() {
-    //
-    // Communication over domain sockets is assumed to be secure, since it
-    // doesn't pass over any network.  We also carefully control the privileges
-    // that can be used on the domain socket inode and its parent directories.
-    // See 
#{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0}
-    // for details.
-    //
-    // So unless you are running as root or the hdfs superuser, you cannot
-    // launch a man-in-the-middle attach on UNIX domain socket traffic.
-    //
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
deleted file mode 100644
index 42cf287..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.net;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.channels.ReadableByteChannel;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.net.unix.DomainSocket;
-
-/**
- * Represents a connection to a peer.
- */
-@InterfaceAudience.Private
-public interface Peer extends Closeable {
-  /**
-   * @return                The input stream channel associated with this
-   *                        peer, or null if it has none.
-   */
-  public ReadableByteChannel getInputStreamChannel();
-
-  /**
-   * Set the read timeout on this peer.
-   *
-   * @param timeoutMs       The timeout in milliseconds.
-   */
-  public void setReadTimeout(int timeoutMs) throws IOException;
-
-  /**
-   * @return                The receive buffer size.
-   */
-  public int getReceiveBufferSize() throws IOException;
-
-  /**
-   * @return                True if TCP_NODELAY is turned on.
-   */
-  public boolean getTcpNoDelay() throws IOException;
-
-  /**
-   * Set the write timeout on this peer.
-   *
-   * Note: this is not honored for BasicInetPeer.
-   * See {@link BasicSocketPeer#setWriteTimeout} for details.
-   * 
-   * @param timeoutMs       The timeout in milliseconds.
-   */
-  public void setWriteTimeout(int timeoutMs) throws IOException;
-
-  /**
-   * @return                true only if the peer is closed.
-   */
-  public boolean isClosed();
-  
-  /**
-   * Close the peer.
-   *
-   * It's safe to re-close a Peer that is already closed.
-   */
-  public void close() throws IOException;
-
-  /**
-   * @return               A string representing the remote end of our 
-   *                       connection to the peer.
-   */
-  public String getRemoteAddressString();
-
-  /**
-   * @return               A string representing the local end of our 
-   *                       connection to the peer.
-   */
-  public String getLocalAddressString();
-  
-  /**
-   * @return               An InputStream associated with the Peer.
-   *                       This InputStream will be valid until you close
-   *                       this peer with Peer#close.
-   */
-  public InputStream getInputStream() throws IOException;
-  
-  /**
-   * @return               An OutputStream associated with the Peer.
-   *                       This OutputStream will be valid until you close
-   *                       this peer with Peer#close.
-   */
-  public OutputStream getOutputStream() throws IOException;
-
-  /**
-   * @return               True if the peer resides on the same
-   *                       computer as we.
-   */
-  public boolean isLocal();
-
-  /**
-   * @return               The DomainSocket associated with the current
-   *                       peer, or null if there is none.
-   */
-  public DomainSocket getDomainSocket();
-  
-  /**
-   * Return true if the channel is secure.
-   *
-   * @return               True if our channel to this peer is not
-   *                       susceptible to man-in-the-middle attacks.
-   */
-  public boolean hasSecureChannel();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
deleted file mode 100644
index 5f86e52..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/** Block Construction Stage */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public enum BlockConstructionStage {
-  /** The enumerates are always listed as regular stage followed by the
-   * recovery stage. 
-   * Changing this order will make getRecoveryStage not working.
-   */
-  // pipeline set up for block append
-  PIPELINE_SETUP_APPEND,
-  // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
-  PIPELINE_SETUP_APPEND_RECOVERY,
-  // data streaming
-  DATA_STREAMING,
-  // pipeline setup for failed data streaming recovery
-  PIPELINE_SETUP_STREAMING_RECOVERY,
-  // close the block and pipeline
-  PIPELINE_CLOSE,
-  // Recover a failed PIPELINE_CLOSE
-  PIPELINE_CLOSE_RECOVERY,
-  // pipeline set up for block creation
-  PIPELINE_SETUP_CREATE,
-  // transfer RBW for adding datanodes
-  TRANSFER_RBW,
-  // transfer Finalized for adding datanodes
-  TRANSFER_FINALIZED;
-  
-  final static private byte RECOVERY_BIT = (byte)1;
-  
-  /**
-   * get the recovery stage of this stage
-   */
-  public BlockConstructionStage getRecoveryStage() {
-    if (this == PIPELINE_SETUP_CREATE) {
-      throw new IllegalArgumentException( "Unexpected blockStage " + this);
-    } else {
-      return values()[ordinal()|RECOVERY_BIT];
-    }
-  }
-}    

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
deleted file mode 100644
index 28097ab..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceInfo;
-import org.apache.htrace.TraceScope;
-
-/**
- * Static utilities for dealing with the protocol buffers used by the
- * Data Transfer Protocol.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public abstract class DataTransferProtoUtil {
-  static BlockConstructionStage fromProto(
-      OpWriteBlockProto.BlockConstructionStage stage) {
-    return BlockConstructionStage.valueOf(stage.name());
-  }
-
-  static OpWriteBlockProto.BlockConstructionStage toProto(
-      BlockConstructionStage stage) {
-    return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name());
-  }
-
-  public static ChecksumProto toProto(DataChecksum checksum) {
-    ChecksumTypeProto type = 
PBHelperClient.convert(checksum.getChecksumType());
-    // ChecksumType#valueOf never returns null
-    return ChecksumProto.newBuilder()
-      .setBytesPerChecksum(checksum.getBytesPerChecksum())
-      .setType(type)
-      .build();
-  }
-
-  public static DataChecksum fromProto(ChecksumProto proto) {
-    if (proto == null) return null;
-
-    int bytesPerChecksum = proto.getBytesPerChecksum();
-    DataChecksum.Type type = PBHelperClient.convert(proto.getType());
-    return DataChecksum.newDataChecksum(type, bytesPerChecksum);
-  }
-
-  static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
-      String client, Token<BlockTokenIdentifier> blockToken) {
-    ClientOperationHeaderProto header =
-      ClientOperationHeaderProto.newBuilder()
-        .setBaseHeader(buildBaseHeader(blk, blockToken))
-        .setClientName(client)
-        .build();
-    return header;
-  }
-
-  static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
-      Token<BlockTokenIdentifier> blockToken) {
-    BaseHeaderProto.Builder builder =  BaseHeaderProto.newBuilder()
-      .setBlock(PBHelperClient.convert(blk))
-      .setToken(PBHelperClient.convert(blockToken));
-    if (Trace.isTracing()) {
-      Span s = Trace.currentSpan();
-      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
-          .setTraceId(s.getTraceId())
-          .setParentId(s.getSpanId()));
-    }
-    return builder.build();
-  }
-
-  public static TraceInfo fromProto(DataTransferTraceInfoProto proto) {
-    if (proto == null) return null;
-    if (!proto.hasTraceId()) return null;
-    return new TraceInfo(proto.getTraceId(), proto.getParentId());
-  }
-
-  public static TraceScope continueTraceSpan(ClientOperationHeaderProto header,
-      String description) {
-    return continueTraceSpan(header.getBaseHeader(), description);
-  }
-
-  public static TraceScope continueTraceSpan(BaseHeaderProto header,
-      String description) {
-    return continueTraceSpan(header.getTraceInfo(), description);
-  }
-
-  public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto,
-      String description) {
-    TraceScope scope = null;
-    TraceInfo info = fromProto(proto);
-    if (info != null) {
-      scope = Trace.startSpan(description, info);
-    }
-    return scope;
-  }
-
-  public static void checkBlockOpStatus(
-          BlockOpResponseProto response,
-          String logInfo) throws IOException {
-    if (response.getStatus() != Status.SUCCESS) {
-      if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
-        throw new InvalidBlockTokenException(
-          "Got access token error"
-          + ", status message " + response.getMessage()
-          + ", " + logInfo
-        );
-      } else {
-        throw new IOException(
-          "Got error"
-          + ", status message " + response.getMessage()
-          + ", " + logInfo
-        );
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
deleted file mode 100644
index 1f7e378..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Transfer data to/from datanode using a streaming protocol.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface DataTransferProtocol {
-  public static final Logger LOG = 
LoggerFactory.getLogger(DataTransferProtocol.class);
-  
-  /** Version for data transfers between clients and datanodes
-   * This should change when serialization of DatanodeInfo, not just
-   * when protocol changes. It is not very obvious. 
-   */
-  /*
-   * Version 28:
-   *    Declare methods in DataTransferProtocol interface.
-   */
-  public static final int DATA_TRANSFER_VERSION = 28;
-
-  /** 
-   * Read a block.
-   * 
-   * @param blk the block being read.
-   * @param blockToken security token for accessing the block.
-   * @param clientName client's name.
-   * @param blockOffset offset of the block.
-   * @param length maximum number of bytes for this read.
-   * @param sendChecksum if false, the DN should skip reading and sending
-   *        checksums
-   * @param cachingStrategy  The caching strategy to use.
-   */
-  public void readBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final long blockOffset,
-      final long length,
-      final boolean sendChecksum,
-      final CachingStrategy cachingStrategy) throws IOException;
-
-  /**
-   * Write a block to a datanode pipeline.
-   * The receiver datanode of this call is the next datanode in the pipeline.
-   * The other downstream datanodes are specified by the targets parameter.
-   * Note that the receiver {@link DatanodeInfo} is not required in the
-   * parameter list since the receiver datanode knows its info.  However, the
-   * {@link StorageType} for storing the replica in the receiver datanode is a 
-   * parameter since the receiver datanode may support multiple storage types.
-   *
-   * @param blk the block being written.
-   * @param storageType for storing the replica in the receiver datanode.
-   * @param blockToken security token for accessing the block.
-   * @param clientName client's name.
-   * @param targets other downstream datanodes in the pipeline.
-   * @param targetStorageTypes target {@link StorageType}s corresponding
-   *                           to the target datanodes.
-   * @param source source datanode.
-   * @param stage pipeline stage.
-   * @param pipelineSize the size of the pipeline.
-   * @param minBytesRcvd minimum number of bytes received.
-   * @param maxBytesRcvd maximum number of bytes received.
-   * @param latestGenerationStamp the latest generation stamp of the block.
-   * @param pinning whether to pin the block, so Balancer won't move it.
-   * @param targetPinnings whether to pin the block on target datanode
-   */
-  public void writeBlock(final ExtendedBlock blk,
-      final StorageType storageType, 
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes, 
-      final DatanodeInfo source,
-      final BlockConstructionStage stage,
-      final int pipelineSize,
-      final long minBytesRcvd,
-      final long maxBytesRcvd,
-      final long latestGenerationStamp,
-      final DataChecksum requestedChecksum,
-      final CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist,
-      final boolean pinning,
-      final boolean[] targetPinnings) throws IOException;
-  /**
-   * Transfer a block to another datanode.
-   * The block stage must be
-   * either {@link BlockConstructionStage#TRANSFER_RBW}
-   * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
-   * 
-   * @param blk the block being transferred.
-   * @param blockToken security token for accessing the block.
-   * @param clientName client's name.
-   * @param targets target datanodes.
-   */
-  public void transferBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes) throws IOException;
-
-  /**
-   * Request short circuit access file descriptors from a DataNode.
-   *
-   * @param blk             The block to get file descriptors for.
-   * @param blockToken      Security token for accessing the block.
-   * @param slotId          The shared memory slot id to use, or null 
-   *                          to use no slot id.
-   * @param maxVersion      Maximum version of the block data the client 
-   *                          can understand.
-   * @param supportsReceiptVerification  True if the client supports
-   *                          receipt verification.
-   */
-  public void requestShortCircuitFds(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
-        throws IOException;
-
-  /**
-   * Release a pair of short-circuit FDs requested earlier.
-   *
-   * @param slotId          SlotID used by the earlier file descriptors.
-   */
-  public void releaseShortCircuitFds(final SlotId slotId) throws IOException;
-
-  /**
-   * Request a short circuit shared memory area from a DataNode.
-   * 
-   * @param clientName       The name of the client.
-   */
-  public void requestShortCircuitShm(String clientName) throws IOException;
-  
-  /**
-   * Receive a block from a source datanode
-   * and then notifies the namenode
-   * to remove the copy from the original datanode.
-   * Note that the source datanode and the original datanode can be different.
-   * It is used for balancing purpose.
-   * 
-   * @param blk the block being replaced.
-   * @param storageType the {@link StorageType} for storing the block.
-   * @param blockToken security token for accessing the block.
-   * @param delHint the hint for deleting the block in the original datanode.
-   * @param source the source datanode for receiving the block.
-   */
-  public void replaceBlock(final ExtendedBlock blk,
-      final StorageType storageType, 
-      final Token<BlockTokenIdentifier> blockToken,
-      final String delHint,
-      final DatanodeInfo source) throws IOException;
-
-  /**
-   * Copy a block. 
-   * It is used for balancing purpose.
-   * 
-   * @param blk the block being copied.
-   * @param blockToken security token for accessing the block.
-   */
-  public void copyBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException;
-
-  /**
-   * Get block checksum (MD5 of CRC32).
-   * 
-   * @param blk a block.
-   * @param blockToken security token for accessing the block.
-   * @throws IOException
-   */
-  public void blockChecksum(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
deleted file mode 100644
index 3077498..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/** Operation */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public enum Op {
-  WRITE_BLOCK((byte)80),
-  READ_BLOCK((byte)81),
-  READ_METADATA((byte)82),
-  REPLACE_BLOCK((byte)83),
-  COPY_BLOCK((byte)84),
-  BLOCK_CHECKSUM((byte)85),
-  TRANSFER_BLOCK((byte)86),
-  REQUEST_SHORT_CIRCUIT_FDS((byte)87),
-  RELEASE_SHORT_CIRCUIT_FDS((byte)88),
-  REQUEST_SHORT_CIRCUIT_SHM((byte)89),
-  CUSTOM((byte)127);
-
-  /** The code for this operation. */
-  public final byte code;
-  
-  private Op(byte code) {
-    this.code = code;
-  }
-  
-  private static final int FIRST_CODE = values()[0].code;
-  /** Return the object represented by the code. */
-  private static Op valueOf(byte code) {
-    final int i = (code & 0xff) - FIRST_CODE;
-    return i < 0 || i >= values().length? null: values()[i];
-  }
-
-  /** Read from in */
-  public static Op read(DataInput in) throws IOException {
-    return valueOf(in.readByte());
-  }
-
-  /** Write to out */
-  public void write(DataOutput out) throws IOException {
-    out.write(code);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
deleted file mode 100644
index 2d11dc2..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import static 
org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto;
-
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-
-import org.apache.htrace.Trace;
-import org.apache.htrace.Span;
-
-import com.google.protobuf.Message;
-
-/** Sender */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class Sender implements DataTransferProtocol {
-  private final DataOutputStream out;
-
-  /** Create a sender for DataTransferProtocol with a output stream. */
-  public Sender(final DataOutputStream out) {
-    this.out = out;    
-  }
-
-  /** Initialize a operation. */
-  private static void op(final DataOutput out, final Op op
-      ) throws IOException {
-    out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-    op.write(out);
-  }
-
-  private static void send(final DataOutputStream out, final Op opcode,
-      final Message proto) throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
-          + ": " + proto);
-    }
-    op(out, opcode);
-    proto.writeDelimitedTo(out);
-    out.flush();
-  }
-
-  static private CachingStrategyProto getCachingStrategy(CachingStrategy 
cachingStrategy) {
-    CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
-    if (cachingStrategy.getReadahead() != null) {
-      builder.setReadahead(cachingStrategy.getReadahead().longValue());
-    }
-    if (cachingStrategy.getDropBehind() != null) {
-      builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
-    }
-    return builder.build();
-  }
-
-  @Override
-  public void readBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final long blockOffset,
-      final long length,
-      final boolean sendChecksum,
-      final CachingStrategy cachingStrategy) throws IOException {
-
-    OpReadBlockProto proto = OpReadBlockProto.newBuilder()
-      .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, 
blockToken))
-      .setOffset(blockOffset)
-      .setLen(length)
-      .setSendChecksums(sendChecksum)
-      .setCachingStrategy(getCachingStrategy(cachingStrategy))
-      .build();
-
-    send(out, Op.READ_BLOCK, proto);
-  }
-  
-
-  @Override
-  public void writeBlock(final ExtendedBlock blk,
-      final StorageType storageType, 
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes, 
-      final DatanodeInfo source,
-      final BlockConstructionStage stage,
-      final int pipelineSize,
-      final long minBytesRcvd,
-      final long maxBytesRcvd,
-      final long latestGenerationStamp,
-      DataChecksum requestedChecksum,
-      final CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist,
-      final boolean pinning,
-      final boolean[] targetPinnings) throws IOException {
-    ClientOperationHeaderProto header = 
DataTransferProtoUtil.buildClientHeader(
-        blk, clientName, blockToken);
-    
-    ChecksumProto checksumProto =
-      DataTransferProtoUtil.toProto(requestedChecksum);
-
-    OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
-      .setHeader(header)
-      .setStorageType(PBHelperClient.convertStorageType(storageType))
-      .addAllTargets(PBHelperClient.convert(targets, 1))
-      
.addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes,
 1))
-      .setStage(toProto(stage))
-      .setPipelineSize(pipelineSize)
-      .setMinBytesRcvd(minBytesRcvd)
-      .setMaxBytesRcvd(maxBytesRcvd)
-      .setLatestGenerationStamp(latestGenerationStamp)
-      .setRequestedChecksum(checksumProto)
-      .setCachingStrategy(getCachingStrategy(cachingStrategy))
-      .setAllowLazyPersist(allowLazyPersist)
-      .setPinning(pinning)
-      .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1));
-    
-    if (source != null) {
-      proto.setSource(PBHelperClient.convertDatanodeInfo(source));
-    }
-
-    send(out, Op.WRITE_BLOCK, proto.build());
-  }
-
-  @Override
-  public void transferBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes) throws IOException {
-    
-    OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
-      .setHeader(DataTransferProtoUtil.buildClientHeader(
-          blk, clientName, blockToken))
-      .addAllTargets(PBHelperClient.convert(targets))
-      
.addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes))
-      .build();
-
-    send(out, Op.TRANSFER_BLOCK, proto);
-  }
-
-  @Override
-  public void requestShortCircuitFds(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
-        throws IOException {
-    OpRequestShortCircuitAccessProto.Builder builder =
-        OpRequestShortCircuitAccessProto.newBuilder()
-          .setHeader(DataTransferProtoUtil.buildBaseHeader(
-            blk, blockToken)).setMaxVersion(maxVersion);
-    if (slotId != null) {
-      builder.setSlotId(PBHelperClient.convert(slotId));
-    }
-    builder.setSupportsReceiptVerification(supportsReceiptVerification);
-    OpRequestShortCircuitAccessProto proto = builder.build();
-    send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
-  }
-  
-  @Override
-  public void releaseShortCircuitFds(SlotId slotId) throws IOException {
-    ReleaseShortCircuitAccessRequestProto.Builder builder =
-        ReleaseShortCircuitAccessRequestProto.newBuilder().
-        setSlotId(PBHelperClient.convert(slotId));
-    if (Trace.isTracing()) {
-      Span s = Trace.currentSpan();
-      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
-          .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
-    }
-    ReleaseShortCircuitAccessRequestProto proto = builder.build();
-    send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
-  }
-
-  @Override
-  public void requestShortCircuitShm(String clientName) throws IOException {
-    ShortCircuitShmRequestProto.Builder builder =
-        ShortCircuitShmRequestProto.newBuilder().
-        setClientName(clientName);
-    if (Trace.isTracing()) {
-      Span s = Trace.currentSpan();
-      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
-          .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
-    }
-    ShortCircuitShmRequestProto proto = builder.build();
-    send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
-  }
-  
-  @Override
-  public void replaceBlock(final ExtendedBlock blk,
-      final StorageType storageType, 
-      final Token<BlockTokenIdentifier> blockToken,
-      final String delHint,
-      final DatanodeInfo source) throws IOException {
-    OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
-      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
-      .setStorageType(PBHelperClient.convertStorageType(storageType))
-      .setDelHint(delHint)
-      .setSource(PBHelperClient.convertDatanodeInfo(source))
-      .build();
-    
-    send(out, Op.REPLACE_BLOCK, proto);
-  }
-
-  @Override
-  public void copyBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException {
-    OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
-      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
-      .build();
-    
-    send(out, Op.COPY_BLOCK, proto);
-  }
-
-  @Override
-  public void blockChecksum(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException {
-    OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
-      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
-      .build();
-    
-    send(out, Op.BLOCK_CHECKSUM, proto);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
deleted file mode 100644
index edf658a..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocolPB;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedInputStream;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
-import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
-import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Utilities for converting protobuf classes to and from implementation classes
- * and other helper utilities to help in dealing with protobuf.
- *
- * Note that when converting from an internal type to protobuf type, the
- * converter never return null for protobuf type. The check for internal type
- * being null must be done before calling the convert() method.
- */
-public class PBHelperClient {
-  private PBHelperClient() {
-    /** Hidden constructor */
-  }
-
-  public static ByteString getByteString(byte[] bytes) {
-    return ByteString.copyFrom(bytes);
-  }
-
-  public static ShmId convert(ShortCircuitShmIdProto shmId) {
-    return new ShmId(shmId.getHi(), shmId.getLo());
-  }
-
-  public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
-    return DataChecksum.Type.valueOf(type.getNumber());
-  }
-
-  public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
-    return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
-  }
-
-  public static ExtendedBlockProto convert(final ExtendedBlock b) {
-    if (b == null) return null;
-    return ExtendedBlockProto.newBuilder().
-      setPoolId(b.getBlockPoolId()).
-      setBlockId(b.getBlockId()).
-      setNumBytes(b.getNumBytes()).
-      setGenerationStamp(b.getGenerationStamp()).
-      build();
-  }
-
-  public static TokenProto convert(Token<?> tok) {
-    return TokenProto.newBuilder().
-      setIdentifier(ByteString.copyFrom(tok.getIdentifier())).
-      setPassword(ByteString.copyFrom(tok.getPassword())).
-      setKind(tok.getKind().toString()).
-      setService(tok.getService().toString()).build();
-  }
-
-  public static ShortCircuitShmIdProto convert(ShmId shmId) {
-    return ShortCircuitShmIdProto.newBuilder().
-      setHi(shmId.getHi()).
-      setLo(shmId.getLo()).
-      build();
-
-  }
-
-  public static ShortCircuitShmSlotProto convert(SlotId slotId) {
-    return ShortCircuitShmSlotProto.newBuilder().
-      setShmId(convert(slotId.getShmId())).
-      setSlotIdx(slotId.getSlotIdx()).
-      build();
-  }
-
-  public static DatanodeIDProto convert(DatanodeID dn) {
-    // For wire compatibility with older versions we transmit the StorageID
-    // which is the same as the DatanodeUuid. Since StorageID is a required
-    // field we pass the empty string if the DatanodeUuid is not yet known.
-    return DatanodeIDProto.newBuilder()
-      .setIpAddr(dn.getIpAddr())
-      .setHostName(dn.getHostName())
-      .setXferPort(dn.getXferPort())
-      .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : 
"")
-      .setInfoPort(dn.getInfoPort())
-      .setInfoSecurePort(dn.getInfoSecurePort())
-      .setIpcPort(dn.getIpcPort()).build();
-  }
-
-  public static DatanodeInfoProto.AdminState convert(
-    final DatanodeInfo.AdminStates inAs) {
-    switch (inAs) {
-      case NORMAL: return  DatanodeInfoProto.AdminState.NORMAL;
-      case DECOMMISSION_INPROGRESS:
-        return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS;
-      case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED;
-      default: return DatanodeInfoProto.AdminState.NORMAL;
-    }
-  }
-
-  public static DatanodeInfoProto convert(DatanodeInfo info) {
-    DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
-    if (info.getNetworkLocation() != null) {
-      builder.setLocation(info.getNetworkLocation());
-    }
-    builder
-      .setId(convert((DatanodeID) info))
-      .setCapacity(info.getCapacity())
-      .setDfsUsed(info.getDfsUsed())
-      .setRemaining(info.getRemaining())
-      .setBlockPoolUsed(info.getBlockPoolUsed())
-      .setCacheCapacity(info.getCacheCapacity())
-      .setCacheUsed(info.getCacheUsed())
-      .setLastUpdate(info.getLastUpdate())
-      .setLastUpdateMonotonic(info.getLastUpdateMonotonic())
-      .setXceiverCount(info.getXceiverCount())
-      .setAdminState(convert(info.getAdminState()))
-      .build();
-    return builder.build();
-  }
-
-  public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
-    DatanodeInfo[] dnInfos) {
-    return convert(dnInfos, 0);
-  }
-
-  /**
-   * Copy from {@code dnInfos} to a target of list of same size starting at
-   * {@code startIdx}.
-   */
-  public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
-    DatanodeInfo[] dnInfos, int startIdx) {
-    if (dnInfos == null)
-      return null;
-    ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
-      .newArrayListWithCapacity(dnInfos.length);
-    for (int i = startIdx; i < dnInfos.length; i++) {
-      protos.add(convert(dnInfos[i]));
-    }
-    return protos;
-  }
-
-  public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
-    List<Boolean> pinnings = new ArrayList<>();
-    if (targetPinnings == null) {
-      pinnings.add(Boolean.FALSE);
-    } else {
-      for (; idx < targetPinnings.length; ++idx) {
-        pinnings.add(targetPinnings[idx]);
-      }
-    }
-    return pinnings;
-  }
-
-  static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
-    if (di == null) return null;
-    return convert(di);
-  }
-
-  public static StorageTypeProto convertStorageType(StorageType type) {
-    switch(type) {
-      case DISK:
-        return StorageTypeProto.DISK;
-      case SSD:
-        return StorageTypeProto.SSD;
-      case ARCHIVE:
-        return StorageTypeProto.ARCHIVE;
-      case RAM_DISK:
-        return StorageTypeProto.RAM_DISK;
-      default:
-        throw new IllegalStateException(
-          "BUG: StorageType not found, type=" + type);
-    }
-  }
-
-  public static StorageType convertStorageType(StorageTypeProto type) {
-    switch(type) {
-      case DISK:
-        return StorageType.DISK;
-      case SSD:
-        return StorageType.SSD;
-      case ARCHIVE:
-        return StorageType.ARCHIVE;
-      case RAM_DISK:
-        return StorageType.RAM_DISK;
-      default:
-        throw new IllegalStateException(
-          "BUG: StorageTypeProto not found, type=" + type);
-    }
-  }
-
-  public static List<StorageTypeProto> convertStorageTypes(
-    StorageType[] types) {
-    return convertStorageTypes(types, 0);
-  }
-
-  public static List<StorageTypeProto> convertStorageTypes(
-    StorageType[] types, int startIdx) {
-    if (types == null) {
-      return null;
-    }
-    final List<StorageTypeProto> protos = new ArrayList<>(
-      types.length);
-    for (int i = startIdx; i < types.length; ++i) {
-      protos.add(PBHelperClient.convertStorageType(types[i]));
-    }
-    return protos;
-  }
-
-  public static InputStream vintPrefixed(final InputStream input)
-    throws IOException {
-    final int firstByte = input.read();
-    if (firstByte == -1) {
-      throw new EOFException("Premature EOF: no length prefix available");
-    }
-
-    int size = CodedInputStream.readRawVarint32(firstByte, input);
-    assert size >= 0;
-    return new ExactSizeInputStream(input, size);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
deleted file mode 100644
index 2fa86fa..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.security.token.block;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Access token verification failed.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class InvalidBlockTokenException extends IOException {
-  private static final long serialVersionUID = 168L;
-
-  public InvalidBlockTokenException() {
-    super();
-  }
-
-  public InvalidBlockTokenException(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
deleted file mode 100644
index 215df13..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-/**
- * The caching strategy we should use for an HDFS read or write operation.
- */
-public class CachingStrategy {
-  private final Boolean dropBehind; // null = use server defaults
-  private final Long readahead; // null = use server defaults
-  
-  public static CachingStrategy newDefaultStrategy() {
-    return new CachingStrategy(null, null);
-  }
-
-  public static CachingStrategy newDropBehind() {
-    return new CachingStrategy(true, null);
-  }
-
-  public static class Builder {
-    private Boolean dropBehind;
-    private Long readahead;
-
-    public Builder(CachingStrategy prev) {
-      this.dropBehind = prev.dropBehind;
-      this.readahead = prev.readahead;
-    }
-
-    public Builder setDropBehind(Boolean dropBehind) {
-      this.dropBehind = dropBehind;
-      return this;
-    }
-
-    public Builder setReadahead(Long readahead) {
-      this.readahead = readahead;
-      return this;
-    }
-
-    public CachingStrategy build() {
-      return new CachingStrategy(dropBehind, readahead);
-    }
-  }
-
-  public CachingStrategy(Boolean dropBehind, Long readahead) {
-    this.dropBehind = dropBehind;
-    this.readahead = readahead;
-  }
-
-  public Boolean getDropBehind() {
-    return dropBehind;
-  }
-  
-  public Long getReadahead() {
-    return readahead;
-  }
-
-  public String toString() {
-    return "CachingStrategy(dropBehind=" + dropBehind +
-        ", readahead=" + readahead + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
deleted file mode 100644
index 81cc68d..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import 
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.EndpointShmManager;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.net.unix.DomainSocketWatcher;
-
-import com.google.common.base.Preconditions;
-
-/**
- * DfsClientShm is a subclass of ShortCircuitShm which is used by the
- * DfsClient.
- * When the UNIX domain socket associated with this shared memory segment
- * closes unexpectedly, we mark the slots inside this segment as disconnected.
- * ShortCircuitReplica objects that contain disconnected slots are stale,
- * and will not be used to service new reads or mmap operations.
- * However, in-progress read or mmap operations will continue to proceed.
- * Once the last slot is deallocated, the segment can be safely munmapped.
- *
- * Slots may also become stale because the associated replica has been deleted
- * on the DataNode.  In this case, the DataNode will clear the 'valid' bit.
- * The client will then see these slots as stale (see
- * #{ShortCircuitReplica#isStale}).
- */
-public class DfsClientShm extends ShortCircuitShm
-    implements DomainSocketWatcher.Handler {
-  /**
-   * The EndpointShmManager associated with this shared memory segment.
-   */
-  private final EndpointShmManager manager;
-
-  /**
-   * The UNIX domain socket associated with this DfsClientShm.
-   * We rely on the DomainSocketWatcher to close the socket associated with
-   * this DomainPeer when necessary.
-   */
-  private final DomainPeer peer;
-
-  /**
-   * True if this shared memory segment has lost its connection to the
-   * DataNode.
-   *
-   * {@link DfsClientShm#handle} sets this to true.
-   */
-  private boolean disconnected = false;
-
-  DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
-      DomainPeer peer) throws IOException {
-    super(shmId, stream);
-    this.manager = manager;
-    this.peer = peer;
-  }
-
-  public EndpointShmManager getEndpointShmManager() {
-    return manager;
-  }
-
-  public DomainPeer getPeer() {
-    return peer;
-  }
-
-  /**
-   * Determine if the shared memory segment is disconnected from the DataNode.
-   *
-   * This must be called with the DfsClientShmManager lock held.
-   *
-   * @return   True if the shared memory segment is stale.
-   */
-  public synchronized boolean isDisconnected() {
-    return disconnected;
-  }
-
-  /**
-   * Handle the closure of the UNIX domain socket associated with this shared
-   * memory segment by marking this segment as stale.
-   *
-   * If there are no slots associated with this shared memory segment, it will
-   * be freed immediately in this function.
-   */
-  @Override
-  public boolean handle(DomainSocket sock) {
-    manager.unregisterShm(getShmId());
-    synchronized (this) {
-      Preconditions.checkState(!disconnected);
-      disconnected = true;
-      boolean hadSlots = false;
-      for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) {
-        Slot slot = iter.next();
-        slot.makeInvalid();
-        hadSlots = true;
-      }
-      if (!hadSlots) {
-        free();
-      }
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
deleted file mode 100644
index f70398a..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.net.unix.DomainSocketWatcher;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages short-circuit memory segments for an HDFS client.
- * 
- * Clients are responsible for requesting and releasing shared memory segments 
used
- * for communicating with the DataNode. The client will try to allocate new 
slots
- * in the set of existing segments, falling back to getting a new segment from 
the
- * DataNode via {@link DataTransferProtocol#requestShortCircuitFds}.
- * 
- * The counterpart to this class on the DataNode is {@link 
ShortCircuitRegistry}.
- * See {@link ShortCircuitRegistry} for more information on the communication 
protocol.
- */
-@InterfaceAudience.Private
-public class DfsClientShmManager implements Closeable {
-  private static final Logger LOG = LoggerFactory.getLogger(
-      DfsClientShmManager.class);
-
-  /**
-   * Manages short-circuit memory segments that pertain to a given DataNode.
-   */
-  class EndpointShmManager {
-    /**
-     * The datanode we're managing.
-     */
-    private final DatanodeInfo datanode;
-
-    /**
-     * Shared memory segments which have no empty slots.
-     *
-     * Protected by the manager lock.
-     */
-    private final TreeMap<ShmId, DfsClientShm> full =
-        new TreeMap<ShmId, DfsClientShm>();
-
-    /**
-     * Shared memory segments which have at least one empty slot.
-     *
-     * Protected by the manager lock.
-     */
-    private final TreeMap<ShmId, DfsClientShm> notFull =
-        new TreeMap<ShmId, DfsClientShm>();
-
-    /**
-     * True if this datanode doesn't support short-circuit shared memory
-     * segments.
-     *
-     * Protected by the manager lock.
-     */
-    private boolean disabled = false;
-
-    /**
-     * True if we're in the process of loading a shared memory segment from
-     * this DataNode.
-     *
-     * Protected by the manager lock.
-     */
-    private boolean loading = false;
-
-    EndpointShmManager (DatanodeInfo datanode) {
-      this.datanode = datanode;
-    }
-
-    /**
-     * Pull a slot out of a preexisting shared memory segment.
-     *
-     * Must be called with the manager lock held.
-     *
-     * @param blockId     The blockId to put inside the Slot object.
-     *
-     * @return            null if none of our shared memory segments contain a
-     *                      free slot; the slot object otherwise.
-     */
-    private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
-      if (notFull.isEmpty()) {
-        return null;
-      }
-      Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
-      DfsClientShm shm = entry.getValue();
-      ShmId shmId = shm.getShmId();
-      Slot slot = shm.allocAndRegisterSlot(blockId);
-      if (shm.isFull()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
-              " out of " + shm);
-        }
-        DfsClientShm removedShm = notFull.remove(shmId);
-        Preconditions.checkState(removedShm == shm);
-        full.put(shmId, shm);
-      } else {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
-              " out of " + shm);
-        }
-      }
-      return slot;
-    }
-
-    /**
-     * Ask the DataNode for a new shared memory segment.  This function must be
-     * called with the manager lock held.  We will release the lock while
-     * communicating with the DataNode.
-     *
-     * @param clientName    The current client name.
-     * @param peer          The peer to use to talk to the DataNode.
-     *
-     * @return              Null if the DataNode does not support shared memory
-     *                        segments, or experienced an error creating the
-     *                        shm.  The shared memory segment itself on 
success.
-     * @throws IOException  If there was an error communicating over the 
socket.
-     *                        We will not throw an IOException unless the 
socket
-     *                        itself (or the network) is the problem.
-     */
-    private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
-        throws IOException {
-      final DataOutputStream out = 
-          new DataOutputStream(
-              new BufferedOutputStream(peer.getOutputStream()));
-      new Sender(out).requestShortCircuitShm(clientName);
-      ShortCircuitShmResponseProto resp = 
-          ShortCircuitShmResponseProto.parseFrom(
-            PBHelperClient.vintPrefixed(peer.getInputStream()));
-      String error = resp.hasError() ? resp.getError() : "(unknown)";
-      switch (resp.getStatus()) {
-      case SUCCESS:
-        DomainSocket sock = peer.getDomainSocket();
-        byte buf[] = new byte[1];
-        FileInputStream fis[] = new FileInputStream[1];
-        if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
-          throw new EOFException("got EOF while trying to transfer the " +
-              "file descriptor for the shared memory segment.");
-        }
-        if (fis[0] == null) {
-          throw new IOException("the datanode " + datanode + " failed to " +
-              "pass a file descriptor for the shared memory segment.");
-        }
-        try {
-          DfsClientShm shm = 
-              new DfsClientShm(PBHelperClient.convert(resp.getId()),
-                  fis[0], this, peer);
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": createNewShm: created " + shm);
-          }
-          return shm;
-        } finally {
-          try {
-            fis[0].close();
-          } catch (Throwable e) {
-            LOG.debug("Exception in closing " + fis[0], e);
-          }
-        }
-      case ERROR_UNSUPPORTED:
-        // The DataNode just does not support short-circuit shared memory
-        // access, and we should stop asking.
-        LOG.info(this + ": datanode does not support short-circuit " +
-            "shared memory access: " + error);
-        disabled = true;
-        return null;
-      default:
-        // The datanode experienced some kind of unexpected error when trying 
to
-        // create the short-circuit shared memory segment.
-        LOG.warn(this + ": error requesting short-circuit shared memory " +
-            "access: " + error);
-        return null;
-      }
-    }
-
-    /**
-     * Allocate a new shared memory slot connected to this datanode.
-     *
-     * Must be called with the EndpointShmManager lock held.
-     *
-     * @param peer          The peer to use to talk to the DataNode.
-     * @param usedPeer      (out param) Will be set to true if we used the 
peer.
-     *                        When a peer is used
-     *
-     * @param clientName    The client name.
-     * @param blockId       The block ID to use.
-     * @return              null if the DataNode does not support shared memory
-     *                        segments, or experienced an error creating the
-     *                        shm.  The shared memory segment itself on 
success.
-     * @throws IOException  If there was an error communicating over the 
socket.
-     */
-    Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
-        String clientName, ExtendedBlockId blockId) throws IOException {
-      while (true) {
-        if (closed) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": the DfsClientShmManager has been closed.");
-          }
-          return null;
-        }
-        if (disabled) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": shared memory segment access is disabled.");
-          }
-          return null;
-        }
-        // Try to use an existing slot.
-        Slot slot = allocSlotFromExistingShm(blockId);
-        if (slot != null) {
-          return slot;
-        }
-        // There are no free slots.  If someone is loading more slots, wait
-        // for that to finish.
-        if (loading) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": waiting for loading to finish...");
-          }
-          finishedLoading.awaitUninterruptibly();
-        } else {
-          // Otherwise, load the slot ourselves.
-          loading = true;
-          lock.unlock();
-          DfsClientShm shm;
-          try {
-            shm = requestNewShm(clientName, peer);
-            if (shm == null) continue;
-            // See #{DfsClientShmManager#domainSocketWatcher} for details
-            // about why we do this before retaking the manager lock.
-            domainSocketWatcher.add(peer.getDomainSocket(), shm);
-            // The DomainPeer is now our responsibility, and should not be
-            // closed by the caller.
-            usedPeer.setValue(true);
-          } finally {
-            lock.lock();
-            loading = false;
-            finishedLoading.signalAll();
-          }
-          if (shm.isDisconnected()) {
-            // If the peer closed immediately after the shared memory segment
-            // was created, the DomainSocketWatcher callback might already have
-            // fired and marked the shm as disconnected.  In this case, we
-            // obviously don't want to add the SharedMemorySegment to our list
-            // of valid not-full segments.
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(this + ": the UNIX domain socket associated with " +
-                  "this short-circuit memory closed before we could make " +
-                  "use of the shm.");
-            }
-          } else {
-            notFull.put(shm.getShmId(), shm);
-          }
-        }
-      }
-    }
-    
-    /**
-     * Stop tracking a slot.
-     *
-     * Must be called with the EndpointShmManager lock held.
-     *
-     * @param slot          The slot to release.
-     */
-    void freeSlot(Slot slot) {
-      DfsClientShm shm = (DfsClientShm)slot.getShm();
-      shm.unregisterSlot(slot.getSlotIdx());
-      if (shm.isDisconnected()) {
-        // Stale shared memory segments should not be tracked here.
-        Preconditions.checkState(!full.containsKey(shm.getShmId()));
-        Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
-        if (shm.isEmpty()) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": freeing empty stale " + shm);
-          }
-          shm.free();
-        }
-      } else {
-        ShmId shmId = shm.getShmId();
-        full.remove(shmId); // The shm can't be full if we just freed a slot.
-        if (shm.isEmpty()) {
-          notFull.remove(shmId);
-  
-          // If the shared memory segment is now empty, we call shutdown(2) on
-          // the UNIX domain socket associated with it.  The 
DomainSocketWatcher,
-          // which is watching this socket, will call DfsClientShm#handle,
-          // cleaning up this shared memory segment.
-          //
-          // See #{DfsClientShmManager#domainSocketWatcher} for details about 
why
-          // we don't want to call DomainSocketWatcher#remove directly here.
-          //
-          // Note that we could experience 'fragmentation' here, where the
-          // DFSClient allocates a bunch of slots in different shared memory
-          // segments, and then frees most of them, but never fully empties out
-          // any segment.  We make some attempt to avoid this fragmentation by
-          // always allocating new slots out of the shared memory segment with 
the
-          // lowest ID, but it could still occur.  In most workloads,
-          // fragmentation should not be a major concern, since it doesn't 
impact
-          // peak file descriptor usage or the speed of allocation.
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": shutting down UNIX domain socket for " +
-                "empty " + shm);
-          }
-          shutdown(shm);
-        } else {
-          notFull.put(shmId, shm);
-        }
-      }
-    }
-    
-    /**
-     * Unregister a shared memory segment.
-     *
-     * Once a segment is unregistered, we will not allocate any more slots
-     * inside that segment.
-     *
-     * The DomainSocketWatcher calls this while holding the DomainSocketWatcher
-     * lock.
-     *
-     * @param shmId         The ID of the shared memory segment to unregister.
-     */
-    void unregisterShm(ShmId shmId) {
-      lock.lock();
-      try {
-        full.remove(shmId);
-        notFull.remove(shmId);
-      } finally {
-        lock.unlock();
-      }
-    }
-
-    @Override
-    public String toString() {
-      return String.format("EndpointShmManager(%s, parent=%s)",
-          datanode, DfsClientShmManager.this);
-    }
-
-    PerDatanodeVisitorInfo getVisitorInfo() {
-      return new PerDatanodeVisitorInfo(full, notFull, disabled);
-    }
-
-    final void shutdown(DfsClientShm shm) {
-      try {
-        shm.getPeer().getDomainSocket().shutdown();
-      } catch (IOException e) {
-        LOG.warn(this + ": error shutting down shm: got IOException calling " +
-            "shutdown(SHUT_RDWR)", e);
-      }
-    }
-  }
-
-  private boolean closed = false;
-
-  private final ReentrantLock lock = new ReentrantLock();
-
-  /**
-   * A condition variable which is signalled when we finish loading a segment
-   * from the Datanode.
-   */
-  private final Condition finishedLoading = lock.newCondition();
-
-  /**
-   * Information about each Datanode.
-   */
-  private final HashMap<DatanodeInfo, EndpointShmManager> datanodes =
-      new HashMap<DatanodeInfo, EndpointShmManager>(1);
-  
-  /**
-   * The DomainSocketWatcher which keeps track of the UNIX domain socket
-   * associated with each shared memory segment.
-   *
-   * Note: because the DomainSocketWatcher makes callbacks into this
-   * DfsClientShmManager object, you must MUST NOT attempt to take the
-   * DomainSocketWatcher lock while holding the DfsClientShmManager lock,
-   * or else deadlock might result.   This means that most DomainSocketWatcher
-   * methods are off-limits unless you release the manager lock first.
-   */
-  private final DomainSocketWatcher domainSocketWatcher;
-  
-  DfsClientShmManager(int interruptCheckPeriodMs) throws IOException {
-    this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs,
-        "client");
-  }
-  
-  public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
-      MutableBoolean usedPeer, ExtendedBlockId blockId,
-      String clientName) throws IOException {
-    lock.lock();
-    try {
-      if (closed) {
-        LOG.trace(this + ": the DfsClientShmManager isclosed.");
-        return null;
-      }
-      EndpointShmManager shmManager = datanodes.get(datanode);
-      if (shmManager == null) {
-        shmManager = new EndpointShmManager(datanode);
-        datanodes.put(datanode, shmManager);
-      }
-      return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
-    } finally {
-      lock.unlock();
-    }
-  }
-  
-  public void freeSlot(Slot slot) {
-    lock.lock();
-    try {
-      DfsClientShm shm = (DfsClientShm)slot.getShm();
-      shm.getEndpointShmManager().freeSlot(slot);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @VisibleForTesting
-  public static class PerDatanodeVisitorInfo {
-    public final TreeMap<ShmId, DfsClientShm> full;
-    public final TreeMap<ShmId, DfsClientShm> notFull;
-    public final boolean disabled;
-
-    PerDatanodeVisitorInfo(TreeMap<ShmId, DfsClientShm> full,
-        TreeMap<ShmId, DfsClientShm> notFull, boolean disabled) {
-      this.full = full;
-      this.notFull = notFull;
-      this.disabled = disabled;
-    }
-  }
-
-  @VisibleForTesting
-  public interface Visitor {
-    void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
-        throws IOException;
-  }
-
-  @VisibleForTesting
-  public void visit(Visitor visitor) throws IOException {
-    lock.lock();
-    try {
-      HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info = 
-          new HashMap<DatanodeInfo, PerDatanodeVisitorInfo>();
-      for (Entry<DatanodeInfo, EndpointShmManager> entry :
-            datanodes.entrySet()) {
-        info.put(entry.getKey(), entry.getValue().getVisitorInfo());
-      }
-      visitor.visit(info);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Close the DfsClientShmManager.
-   */
-  @Override
-  public void close() throws IOException {
-    lock.lock();
-    try {
-      if (closed) return;
-      closed = true;
-    } finally {
-      lock.unlock();
-    }
-    // When closed, the domainSocketWatcher will issue callbacks that mark
-    // all the outstanding DfsClientShm segments as stale.
-    try {
-      domainSocketWatcher.close();
-    } catch (Throwable e) {
-      LOG.debug("Exception in closing " + domainSocketWatcher, e);
-    }
-  }
-
-
-  @Override
-  public String toString() {
-    return String.format("ShortCircuitShmManager(%08x)",
-        System.identityHashCode(this));
-  }
-
-  @VisibleForTesting
-  public DomainSocketWatcher getDomainSocketWatcher() {
-    return domainSocketWatcher;
-  }
-}

Reply via email to