Revert "HDFS-8934. Move ShortCircuitShm to hdfs-client. Contributed by Mingliang Liu."
This reverts commit 8e4afa3a671583c95263218b85cf6bfbc1e43635. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a727c6db Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a727c6db Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a727c6db Branch: refs/heads/branch-2 Commit: a727c6db0530aff5dcccb4181ba83e93e543ac5c Parents: 87d0133 Author: Andrew Wang <w...@apache.org> Authored: Mon Aug 24 11:51:46 2015 -0700 Committer: Andrew Wang <w...@apache.org> Committed: Mon Aug 24 11:51:46 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/ExtendedBlockId.java | 82 --- .../org/apache/hadoop/hdfs/net/DomainPeer.java | 132 ---- .../java/org/apache/hadoop/hdfs/net/Peer.java | 123 ---- .../datatransfer/BlockConstructionStage.java | 62 -- .../datatransfer/DataTransferProtoUtil.java | 146 ----- .../datatransfer/DataTransferProtocol.java | 202 ------ .../hadoop/hdfs/protocol/datatransfer/Op.java | 66 -- .../hdfs/protocol/datatransfer/Sender.java | 261 -------- .../hadoop/hdfs/protocolPB/PBHelperClient.java | 254 -------- .../token/block/InvalidBlockTokenException.java | 41 -- .../hdfs/server/datanode/CachingStrategy.java | 76 --- .../hadoop/hdfs/shortcircuit/DfsClientShm.java | 119 ---- .../hdfs/shortcircuit/DfsClientShmManager.java | 522 --------------- .../hdfs/shortcircuit/ShortCircuitShm.java | 647 ------------------- .../hadoop/hdfs/util/ExactSizeInputStream.java | 125 ---- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 - .../apache/hadoop/hdfs/BlockReaderFactory.java | 4 +- .../java/org/apache/hadoop/hdfs/DFSClient.java | 10 +- .../org/apache/hadoop/hdfs/DataStreamer.java | 6 +- .../org/apache/hadoop/hdfs/ExtendedBlockId.java | 82 +++ .../apache/hadoop/hdfs/RemoteBlockReader.java | 4 +- .../apache/hadoop/hdfs/RemoteBlockReader2.java | 4 +- .../org/apache/hadoop/hdfs/net/DomainPeer.java | 132 ++++ .../java/org/apache/hadoop/hdfs/net/Peer.java | 123 ++++ .../datatransfer/BlockConstructionStage.java | 62 ++ .../datatransfer/DataTransferProtoUtil.java | 148 +++++ .../datatransfer/DataTransferProtocol.java | 201 ++++++ .../hadoop/hdfs/protocol/datatransfer/Op.java | 66 ++ .../hdfs/protocol/datatransfer/PipelineAck.java | 2 +- .../hdfs/protocol/datatransfer/Receiver.java | 7 +- .../hdfs/protocol/datatransfer/Sender.java | 261 ++++++++ .../datatransfer/sasl/DataTransferSaslUtil.java | 2 +- ...tDatanodeProtocolServerSideTranslatorPB.java | 2 +- .../ClientDatanodeProtocolTranslatorPB.java | 6 +- ...tNamenodeProtocolServerSideTranslatorPB.java | 6 +- .../ClientNamenodeProtocolTranslatorPB.java | 28 +- .../DatanodeProtocolClientSideTranslatorPB.java | 4 +- .../InterDatanodeProtocolTranslatorPB.java | 2 +- .../NamenodeProtocolTranslatorPB.java | 2 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 228 ++++++- .../token/block/InvalidBlockTokenException.java | 41 ++ .../hadoop/hdfs/server/balancer/Dispatcher.java | 2 +- .../hdfs/server/datanode/CachingStrategy.java | 76 +++ .../hadoop/hdfs/server/datanode/DataNode.java | 4 +- .../hdfs/server/datanode/DataXceiver.java | 14 +- .../server/namenode/FSImageFormatPBINode.java | 5 +- .../hadoop/hdfs/shortcircuit/DfsClientShm.java | 119 ++++ .../hdfs/shortcircuit/DfsClientShmManager.java | 514 +++++++++++++++ .../hdfs/shortcircuit/ShortCircuitCache.java | 4 +- .../hdfs/shortcircuit/ShortCircuitShm.java | 646 ++++++++++++++++++ .../hadoop/hdfs/util/ExactSizeInputStream.java | 125 ++++ .../hadoop/hdfs/protocolPB/TestPBHelper.java | 20 +- 52 files changed, 2873 insertions(+), 2949 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java deleted file mode 100644 index 7b9e8e3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.commons.lang.builder.EqualsBuilder; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; - -/** - * An immutable key which identifies a block. - */ -@InterfaceAudience.Private -final public class ExtendedBlockId { - /** - * The block ID for this block. - */ - private final long blockId; - - /** - * The block pool ID for this block. - */ - private final String bpId; - - public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) { - return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); - } - - public ExtendedBlockId(long blockId, String bpId) { - this.blockId = blockId; - this.bpId = bpId; - } - - public long getBlockId() { - return this.blockId; - } - - public String getBlockPoolId() { - return this.bpId; - } - - @Override - public boolean equals(Object o) { - if ((o == null) || (o.getClass() != this.getClass())) { - return false; - } - ExtendedBlockId other = (ExtendedBlockId)o; - return new EqualsBuilder(). - append(blockId, other.blockId). - append(bpId, other.bpId). - isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(). - append(this.blockId). - append(this.bpId). - toHashCode(); - } - - @Override - public String toString() { - return new StringBuilder().append(blockId). - append("_").append(bpId).toString(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java deleted file mode 100644 index 4792b0e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.net; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.channels.ReadableByteChannel; - -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.classification.InterfaceAudience; - -/** - * Represents a peer that we communicate with by using blocking I/O - * on a UNIX domain socket. - */ -@InterfaceAudience.Private -public class DomainPeer implements Peer { - private final DomainSocket socket; - private final OutputStream out; - private final InputStream in; - private final ReadableByteChannel channel; - - public DomainPeer(DomainSocket socket) { - this.socket = socket; - this.out = socket.getOutputStream(); - this.in = socket.getInputStream(); - this.channel = socket.getChannel(); - } - - @Override - public ReadableByteChannel getInputStreamChannel() { - return channel; - } - - @Override - public void setReadTimeout(int timeoutMs) throws IOException { - socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs); - } - - @Override - public int getReceiveBufferSize() throws IOException { - return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); - } - - @Override - public boolean getTcpNoDelay() throws IOException { - /* No TCP, no TCP_NODELAY. */ - return false; - } - - @Override - public void setWriteTimeout(int timeoutMs) throws IOException { - socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs); - } - - @Override - public boolean isClosed() { - return !socket.isOpen(); - } - - @Override - public void close() throws IOException { - socket.close(); - } - - @Override - public String getRemoteAddressString() { - return "unix:" + socket.getPath(); - } - - @Override - public String getLocalAddressString() { - return "<local>"; - } - - @Override - public InputStream getInputStream() throws IOException { - return in; - } - - @Override - public OutputStream getOutputStream() throws IOException { - return out; - } - - @Override - public boolean isLocal() { - /* UNIX domain sockets can only be used for local communication. */ - return true; - } - - @Override - public String toString() { - return "DomainPeer(" + getRemoteAddressString() + ")"; - } - - @Override - public DomainSocket getDomainSocket() { - return socket; - } - - @Override - public boolean hasSecureChannel() { - // - // Communication over domain sockets is assumed to be secure, since it - // doesn't pass over any network. We also carefully control the privileges - // that can be used on the domain socket inode and its parent directories. - // See #{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0} - // for details. - // - // So unless you are running as root or the hdfs superuser, you cannot - // launch a man-in-the-middle attach on UNIX domain socket traffic. - // - return true; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java deleted file mode 100644 index 42cf287..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.net; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.channels.ReadableByteChannel; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.net.unix.DomainSocket; - -/** - * Represents a connection to a peer. - */ -@InterfaceAudience.Private -public interface Peer extends Closeable { - /** - * @return The input stream channel associated with this - * peer, or null if it has none. - */ - public ReadableByteChannel getInputStreamChannel(); - - /** - * Set the read timeout on this peer. - * - * @param timeoutMs The timeout in milliseconds. - */ - public void setReadTimeout(int timeoutMs) throws IOException; - - /** - * @return The receive buffer size. - */ - public int getReceiveBufferSize() throws IOException; - - /** - * @return True if TCP_NODELAY is turned on. - */ - public boolean getTcpNoDelay() throws IOException; - - /** - * Set the write timeout on this peer. - * - * Note: this is not honored for BasicInetPeer. - * See {@link BasicSocketPeer#setWriteTimeout} for details. - * - * @param timeoutMs The timeout in milliseconds. - */ - public void setWriteTimeout(int timeoutMs) throws IOException; - - /** - * @return true only if the peer is closed. - */ - public boolean isClosed(); - - /** - * Close the peer. - * - * It's safe to re-close a Peer that is already closed. - */ - public void close() throws IOException; - - /** - * @return A string representing the remote end of our - * connection to the peer. - */ - public String getRemoteAddressString(); - - /** - * @return A string representing the local end of our - * connection to the peer. - */ - public String getLocalAddressString(); - - /** - * @return An InputStream associated with the Peer. - * This InputStream will be valid until you close - * this peer with Peer#close. - */ - public InputStream getInputStream() throws IOException; - - /** - * @return An OutputStream associated with the Peer. - * This OutputStream will be valid until you close - * this peer with Peer#close. - */ - public OutputStream getOutputStream() throws IOException; - - /** - * @return True if the peer resides on the same - * computer as we. - */ - public boolean isLocal(); - - /** - * @return The DomainSocket associated with the current - * peer, or null if there is none. - */ - public DomainSocket getDomainSocket(); - - /** - * Return true if the channel is secure. - * - * @return True if our channel to this peer is not - * susceptible to man-in-the-middle attacks. - */ - public boolean hasSecureChannel(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java deleted file mode 100644 index 5f86e52..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** Block Construction Stage */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public enum BlockConstructionStage { - /** The enumerates are always listed as regular stage followed by the - * recovery stage. - * Changing this order will make getRecoveryStage not working. - */ - // pipeline set up for block append - PIPELINE_SETUP_APPEND, - // pipeline set up for failed PIPELINE_SETUP_APPEND recovery - PIPELINE_SETUP_APPEND_RECOVERY, - // data streaming - DATA_STREAMING, - // pipeline setup for failed data streaming recovery - PIPELINE_SETUP_STREAMING_RECOVERY, - // close the block and pipeline - PIPELINE_CLOSE, - // Recover a failed PIPELINE_CLOSE - PIPELINE_CLOSE_RECOVERY, - // pipeline set up for block creation - PIPELINE_SETUP_CREATE, - // transfer RBW for adding datanodes - TRANSFER_RBW, - // transfer Finalized for adding datanodes - TRANSFER_FINALIZED; - - final static private byte RECOVERY_BIT = (byte)1; - - /** - * get the recovery stage of this stage - */ - public BlockConstructionStage getRecoveryStage() { - if (this == PIPELINE_SETUP_CREATE) { - throw new IllegalArgumentException( "Unexpected blockStage " + this); - } else { - return values()[ordinal()|RECOVERY_BIT]; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java deleted file mode 100644 index 28097ab..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceInfo; -import org.apache.htrace.TraceScope; - -/** - * Static utilities for dealing with the protocol buffers used by the - * Data Transfer Protocol. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public abstract class DataTransferProtoUtil { - static BlockConstructionStage fromProto( - OpWriteBlockProto.BlockConstructionStage stage) { - return BlockConstructionStage.valueOf(stage.name()); - } - - static OpWriteBlockProto.BlockConstructionStage toProto( - BlockConstructionStage stage) { - return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()); - } - - public static ChecksumProto toProto(DataChecksum checksum) { - ChecksumTypeProto type = PBHelperClient.convert(checksum.getChecksumType()); - // ChecksumType#valueOf never returns null - return ChecksumProto.newBuilder() - .setBytesPerChecksum(checksum.getBytesPerChecksum()) - .setType(type) - .build(); - } - - public static DataChecksum fromProto(ChecksumProto proto) { - if (proto == null) return null; - - int bytesPerChecksum = proto.getBytesPerChecksum(); - DataChecksum.Type type = PBHelperClient.convert(proto.getType()); - return DataChecksum.newDataChecksum(type, bytesPerChecksum); - } - - static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk, - String client, Token<BlockTokenIdentifier> blockToken) { - ClientOperationHeaderProto header = - ClientOperationHeaderProto.newBuilder() - .setBaseHeader(buildBaseHeader(blk, blockToken)) - .setClientName(client) - .build(); - return header; - } - - static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, - Token<BlockTokenIdentifier> blockToken) { - BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() - .setBlock(PBHelperClient.convert(blk)) - .setToken(PBHelperClient.convert(blockToken)); - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() - .setTraceId(s.getTraceId()) - .setParentId(s.getSpanId())); - } - return builder.build(); - } - - public static TraceInfo fromProto(DataTransferTraceInfoProto proto) { - if (proto == null) return null; - if (!proto.hasTraceId()) return null; - return new TraceInfo(proto.getTraceId(), proto.getParentId()); - } - - public static TraceScope continueTraceSpan(ClientOperationHeaderProto header, - String description) { - return continueTraceSpan(header.getBaseHeader(), description); - } - - public static TraceScope continueTraceSpan(BaseHeaderProto header, - String description) { - return continueTraceSpan(header.getTraceInfo(), description); - } - - public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto, - String description) { - TraceScope scope = null; - TraceInfo info = fromProto(proto); - if (info != null) { - scope = Trace.startSpan(description, info); - } - return scope; - } - - public static void checkBlockOpStatus( - BlockOpResponseProto response, - String logInfo) throws IOException { - if (response.getStatus() != Status.SUCCESS) { - if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { - throw new InvalidBlockTokenException( - "Got access token error" - + ", status message " + response.getMessage() - + ", " + logInfo - ); - } else { - throw new IOException( - "Got error" - + ", status message " + response.getMessage() - + ", " + logInfo - ); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java deleted file mode 100644 index 1f7e378..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Transfer data to/from datanode using a streaming protocol. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface DataTransferProtocol { - public static final Logger LOG = LoggerFactory.getLogger(DataTransferProtocol.class); - - /** Version for data transfers between clients and datanodes - * This should change when serialization of DatanodeInfo, not just - * when protocol changes. It is not very obvious. - */ - /* - * Version 28: - * Declare methods in DataTransferProtocol interface. - */ - public static final int DATA_TRANSFER_VERSION = 28; - - /** - * Read a block. - * - * @param blk the block being read. - * @param blockToken security token for accessing the block. - * @param clientName client's name. - * @param blockOffset offset of the block. - * @param length maximum number of bytes for this read. - * @param sendChecksum if false, the DN should skip reading and sending - * checksums - * @param cachingStrategy The caching strategy to use. - */ - public void readBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final long blockOffset, - final long length, - final boolean sendChecksum, - final CachingStrategy cachingStrategy) throws IOException; - - /** - * Write a block to a datanode pipeline. - * The receiver datanode of this call is the next datanode in the pipeline. - * The other downstream datanodes are specified by the targets parameter. - * Note that the receiver {@link DatanodeInfo} is not required in the - * parameter list since the receiver datanode knows its info. However, the - * {@link StorageType} for storing the replica in the receiver datanode is a - * parameter since the receiver datanode may support multiple storage types. - * - * @param blk the block being written. - * @param storageType for storing the replica in the receiver datanode. - * @param blockToken security token for accessing the block. - * @param clientName client's name. - * @param targets other downstream datanodes in the pipeline. - * @param targetStorageTypes target {@link StorageType}s corresponding - * to the target datanodes. - * @param source source datanode. - * @param stage pipeline stage. - * @param pipelineSize the size of the pipeline. - * @param minBytesRcvd minimum number of bytes received. - * @param maxBytesRcvd maximum number of bytes received. - * @param latestGenerationStamp the latest generation stamp of the block. - * @param pinning whether to pin the block, so Balancer won't move it. - * @param targetPinnings whether to pin the block on target datanode - */ - public void writeBlock(final ExtendedBlock blk, - final StorageType storageType, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes, - final DatanodeInfo source, - final BlockConstructionStage stage, - final int pipelineSize, - final long minBytesRcvd, - final long maxBytesRcvd, - final long latestGenerationStamp, - final DataChecksum requestedChecksum, - final CachingStrategy cachingStrategy, - final boolean allowLazyPersist, - final boolean pinning, - final boolean[] targetPinnings) throws IOException; - /** - * Transfer a block to another datanode. - * The block stage must be - * either {@link BlockConstructionStage#TRANSFER_RBW} - * or {@link BlockConstructionStage#TRANSFER_FINALIZED}. - * - * @param blk the block being transferred. - * @param blockToken security token for accessing the block. - * @param clientName client's name. - * @param targets target datanodes. - */ - public void transferBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes) throws IOException; - - /** - * Request short circuit access file descriptors from a DataNode. - * - * @param blk The block to get file descriptors for. - * @param blockToken Security token for accessing the block. - * @param slotId The shared memory slot id to use, or null - * to use no slot id. - * @param maxVersion Maximum version of the block data the client - * can understand. - * @param supportsReceiptVerification True if the client supports - * receipt verification. - */ - public void requestShortCircuitFds(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - SlotId slotId, int maxVersion, boolean supportsReceiptVerification) - throws IOException; - - /** - * Release a pair of short-circuit FDs requested earlier. - * - * @param slotId SlotID used by the earlier file descriptors. - */ - public void releaseShortCircuitFds(final SlotId slotId) throws IOException; - - /** - * Request a short circuit shared memory area from a DataNode. - * - * @param clientName The name of the client. - */ - public void requestShortCircuitShm(String clientName) throws IOException; - - /** - * Receive a block from a source datanode - * and then notifies the namenode - * to remove the copy from the original datanode. - * Note that the source datanode and the original datanode can be different. - * It is used for balancing purpose. - * - * @param blk the block being replaced. - * @param storageType the {@link StorageType} for storing the block. - * @param blockToken security token for accessing the block. - * @param delHint the hint for deleting the block in the original datanode. - * @param source the source datanode for receiving the block. - */ - public void replaceBlock(final ExtendedBlock blk, - final StorageType storageType, - final Token<BlockTokenIdentifier> blockToken, - final String delHint, - final DatanodeInfo source) throws IOException; - - /** - * Copy a block. - * It is used for balancing purpose. - * - * @param blk the block being copied. - * @param blockToken security token for accessing the block. - */ - public void copyBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken) throws IOException; - - /** - * Get block checksum (MD5 of CRC32). - * - * @param blk a block. - * @param blockToken security token for accessing the block. - * @throws IOException - */ - public void blockChecksum(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java deleted file mode 100644 index 3077498..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** Operation */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public enum Op { - WRITE_BLOCK((byte)80), - READ_BLOCK((byte)81), - READ_METADATA((byte)82), - REPLACE_BLOCK((byte)83), - COPY_BLOCK((byte)84), - BLOCK_CHECKSUM((byte)85), - TRANSFER_BLOCK((byte)86), - REQUEST_SHORT_CIRCUIT_FDS((byte)87), - RELEASE_SHORT_CIRCUIT_FDS((byte)88), - REQUEST_SHORT_CIRCUIT_SHM((byte)89), - CUSTOM((byte)127); - - /** The code for this operation. */ - public final byte code; - - private Op(byte code) { - this.code = code; - } - - private static final int FIRST_CODE = values()[0].code; - /** Return the object represented by the code. */ - private static Op valueOf(byte code) { - final int i = (code & 0xff) - FIRST_CODE; - return i < 0 || i >= values().length? null: values()[i]; - } - - /** Read from in */ - public static Op read(DataInput in) throws IOException { - return valueOf(in.readByte()); - } - - /** Write to out */ - public void write(DataOutput out) throws IOException { - out.write(code); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java deleted file mode 100644 index 2d11dc2..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto; - -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; - -import org.apache.htrace.Trace; -import org.apache.htrace.Span; - -import com.google.protobuf.Message; - -/** Sender */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class Sender implements DataTransferProtocol { - private final DataOutputStream out; - - /** Create a sender for DataTransferProtocol with a output stream. */ - public Sender(final DataOutputStream out) { - this.out = out; - } - - /** Initialize a operation. */ - private static void op(final DataOutput out, final Op op - ) throws IOException { - out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); - op.write(out); - } - - private static void send(final DataOutputStream out, final Op opcode, - final Message proto) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName() - + ": " + proto); - } - op(out, opcode); - proto.writeDelimitedTo(out); - out.flush(); - } - - static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) { - CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder(); - if (cachingStrategy.getReadahead() != null) { - builder.setReadahead(cachingStrategy.getReadahead().longValue()); - } - if (cachingStrategy.getDropBehind() != null) { - builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue()); - } - return builder.build(); - } - - @Override - public void readBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final long blockOffset, - final long length, - final boolean sendChecksum, - final CachingStrategy cachingStrategy) throws IOException { - - OpReadBlockProto proto = OpReadBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) - .setOffset(blockOffset) - .setLen(length) - .setSendChecksums(sendChecksum) - .setCachingStrategy(getCachingStrategy(cachingStrategy)) - .build(); - - send(out, Op.READ_BLOCK, proto); - } - - - @Override - public void writeBlock(final ExtendedBlock blk, - final StorageType storageType, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes, - final DatanodeInfo source, - final BlockConstructionStage stage, - final int pipelineSize, - final long minBytesRcvd, - final long maxBytesRcvd, - final long latestGenerationStamp, - DataChecksum requestedChecksum, - final CachingStrategy cachingStrategy, - final boolean allowLazyPersist, - final boolean pinning, - final boolean[] targetPinnings) throws IOException { - ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( - blk, clientName, blockToken); - - ChecksumProto checksumProto = - DataTransferProtoUtil.toProto(requestedChecksum); - - OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() - .setHeader(header) - .setStorageType(PBHelperClient.convertStorageType(storageType)) - .addAllTargets(PBHelperClient.convert(targets, 1)) - .addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes, 1)) - .setStage(toProto(stage)) - .setPipelineSize(pipelineSize) - .setMinBytesRcvd(minBytesRcvd) - .setMaxBytesRcvd(maxBytesRcvd) - .setLatestGenerationStamp(latestGenerationStamp) - .setRequestedChecksum(checksumProto) - .setCachingStrategy(getCachingStrategy(cachingStrategy)) - .setAllowLazyPersist(allowLazyPersist) - .setPinning(pinning) - .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1)); - - if (source != null) { - proto.setSource(PBHelperClient.convertDatanodeInfo(source)); - } - - send(out, Op.WRITE_BLOCK, proto.build()); - } - - @Override - public void transferBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes) throws IOException { - - OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildClientHeader( - blk, clientName, blockToken)) - .addAllTargets(PBHelperClient.convert(targets)) - .addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes)) - .build(); - - send(out, Op.TRANSFER_BLOCK, proto); - } - - @Override - public void requestShortCircuitFds(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - SlotId slotId, int maxVersion, boolean supportsReceiptVerification) - throws IOException { - OpRequestShortCircuitAccessProto.Builder builder = - OpRequestShortCircuitAccessProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader( - blk, blockToken)).setMaxVersion(maxVersion); - if (slotId != null) { - builder.setSlotId(PBHelperClient.convert(slotId)); - } - builder.setSupportsReceiptVerification(supportsReceiptVerification); - OpRequestShortCircuitAccessProto proto = builder.build(); - send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); - } - - @Override - public void releaseShortCircuitFds(SlotId slotId) throws IOException { - ReleaseShortCircuitAccessRequestProto.Builder builder = - ReleaseShortCircuitAccessRequestProto.newBuilder(). - setSlotId(PBHelperClient.convert(slotId)); - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() - .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); - } - ReleaseShortCircuitAccessRequestProto proto = builder.build(); - send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto); - } - - @Override - public void requestShortCircuitShm(String clientName) throws IOException { - ShortCircuitShmRequestProto.Builder builder = - ShortCircuitShmRequestProto.newBuilder(). - setClientName(clientName); - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() - .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); - } - ShortCircuitShmRequestProto proto = builder.build(); - send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); - } - - @Override - public void replaceBlock(final ExtendedBlock blk, - final StorageType storageType, - final Token<BlockTokenIdentifier> blockToken, - final String delHint, - final DatanodeInfo source) throws IOException { - OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) - .setStorageType(PBHelperClient.convertStorageType(storageType)) - .setDelHint(delHint) - .setSource(PBHelperClient.convertDatanodeInfo(source)) - .build(); - - send(out, Op.REPLACE_BLOCK, proto); - } - - @Override - public void copyBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken) throws IOException { - OpCopyBlockProto proto = OpCopyBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) - .build(); - - send(out, Op.COPY_BLOCK, proto); - } - - @Override - public void blockChecksum(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken) throws IOException { - OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) - .build(); - - send(out, Op.BLOCK_CHECKSUM, proto); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java deleted file mode 100644 index edf658a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ /dev/null @@ -1,254 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocolPB; - -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import com.google.protobuf.CodedInputStream; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; -import org.apache.hadoop.hdfs.util.ExactSizeInputStream; -import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; - -/** - * Utilities for converting protobuf classes to and from implementation classes - * and other helper utilities to help in dealing with protobuf. - * - * Note that when converting from an internal type to protobuf type, the - * converter never return null for protobuf type. The check for internal type - * being null must be done before calling the convert() method. - */ -public class PBHelperClient { - private PBHelperClient() { - /** Hidden constructor */ - } - - public static ByteString getByteString(byte[] bytes) { - return ByteString.copyFrom(bytes); - } - - public static ShmId convert(ShortCircuitShmIdProto shmId) { - return new ShmId(shmId.getHi(), shmId.getLo()); - } - - public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) { - return DataChecksum.Type.valueOf(type.getNumber()); - } - - public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) { - return HdfsProtos.ChecksumTypeProto.valueOf(type.id); - } - - public static ExtendedBlockProto convert(final ExtendedBlock b) { - if (b == null) return null; - return ExtendedBlockProto.newBuilder(). - setPoolId(b.getBlockPoolId()). - setBlockId(b.getBlockId()). - setNumBytes(b.getNumBytes()). - setGenerationStamp(b.getGenerationStamp()). - build(); - } - - public static TokenProto convert(Token<?> tok) { - return TokenProto.newBuilder(). - setIdentifier(ByteString.copyFrom(tok.getIdentifier())). - setPassword(ByteString.copyFrom(tok.getPassword())). - setKind(tok.getKind().toString()). - setService(tok.getService().toString()).build(); - } - - public static ShortCircuitShmIdProto convert(ShmId shmId) { - return ShortCircuitShmIdProto.newBuilder(). - setHi(shmId.getHi()). - setLo(shmId.getLo()). - build(); - - } - - public static ShortCircuitShmSlotProto convert(SlotId slotId) { - return ShortCircuitShmSlotProto.newBuilder(). - setShmId(convert(slotId.getShmId())). - setSlotIdx(slotId.getSlotIdx()). - build(); - } - - public static DatanodeIDProto convert(DatanodeID dn) { - // For wire compatibility with older versions we transmit the StorageID - // which is the same as the DatanodeUuid. Since StorageID is a required - // field we pass the empty string if the DatanodeUuid is not yet known. - return DatanodeIDProto.newBuilder() - .setIpAddr(dn.getIpAddr()) - .setHostName(dn.getHostName()) - .setXferPort(dn.getXferPort()) - .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "") - .setInfoPort(dn.getInfoPort()) - .setInfoSecurePort(dn.getInfoSecurePort()) - .setIpcPort(dn.getIpcPort()).build(); - } - - public static DatanodeInfoProto.AdminState convert( - final DatanodeInfo.AdminStates inAs) { - switch (inAs) { - case NORMAL: return DatanodeInfoProto.AdminState.NORMAL; - case DECOMMISSION_INPROGRESS: - return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS; - case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED; - default: return DatanodeInfoProto.AdminState.NORMAL; - } - } - - public static DatanodeInfoProto convert(DatanodeInfo info) { - DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder(); - if (info.getNetworkLocation() != null) { - builder.setLocation(info.getNetworkLocation()); - } - builder - .setId(convert((DatanodeID) info)) - .setCapacity(info.getCapacity()) - .setDfsUsed(info.getDfsUsed()) - .setRemaining(info.getRemaining()) - .setBlockPoolUsed(info.getBlockPoolUsed()) - .setCacheCapacity(info.getCacheCapacity()) - .setCacheUsed(info.getCacheUsed()) - .setLastUpdate(info.getLastUpdate()) - .setLastUpdateMonotonic(info.getLastUpdateMonotonic()) - .setXceiverCount(info.getXceiverCount()) - .setAdminState(convert(info.getAdminState())) - .build(); - return builder.build(); - } - - public static List<? extends HdfsProtos.DatanodeInfoProto> convert( - DatanodeInfo[] dnInfos) { - return convert(dnInfos, 0); - } - - /** - * Copy from {@code dnInfos} to a target of list of same size starting at - * {@code startIdx}. - */ - public static List<? extends HdfsProtos.DatanodeInfoProto> convert( - DatanodeInfo[] dnInfos, int startIdx) { - if (dnInfos == null) - return null; - ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists - .newArrayListWithCapacity(dnInfos.length); - for (int i = startIdx; i < dnInfos.length; i++) { - protos.add(convert(dnInfos[i])); - } - return protos; - } - - public static List<Boolean> convert(boolean[] targetPinnings, int idx) { - List<Boolean> pinnings = new ArrayList<>(); - if (targetPinnings == null) { - pinnings.add(Boolean.FALSE); - } else { - for (; idx < targetPinnings.length; ++idx) { - pinnings.add(targetPinnings[idx]); - } - } - return pinnings; - } - - static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) { - if (di == null) return null; - return convert(di); - } - - public static StorageTypeProto convertStorageType(StorageType type) { - switch(type) { - case DISK: - return StorageTypeProto.DISK; - case SSD: - return StorageTypeProto.SSD; - case ARCHIVE: - return StorageTypeProto.ARCHIVE; - case RAM_DISK: - return StorageTypeProto.RAM_DISK; - default: - throw new IllegalStateException( - "BUG: StorageType not found, type=" + type); - } - } - - public static StorageType convertStorageType(StorageTypeProto type) { - switch(type) { - case DISK: - return StorageType.DISK; - case SSD: - return StorageType.SSD; - case ARCHIVE: - return StorageType.ARCHIVE; - case RAM_DISK: - return StorageType.RAM_DISK; - default: - throw new IllegalStateException( - "BUG: StorageTypeProto not found, type=" + type); - } - } - - public static List<StorageTypeProto> convertStorageTypes( - StorageType[] types) { - return convertStorageTypes(types, 0); - } - - public static List<StorageTypeProto> convertStorageTypes( - StorageType[] types, int startIdx) { - if (types == null) { - return null; - } - final List<StorageTypeProto> protos = new ArrayList<>( - types.length); - for (int i = startIdx; i < types.length; ++i) { - protos.add(PBHelperClient.convertStorageType(types[i])); - } - return protos; - } - - public static InputStream vintPrefixed(final InputStream input) - throws IOException { - final int firstByte = input.read(); - if (firstByte == -1) { - throw new EOFException("Premature EOF: no length prefix available"); - } - - int size = CodedInputStream.readRawVarint32(firstByte, input); - assert size >= 0; - return new ExactSizeInputStream(input, size); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java deleted file mode 100644 index 2fa86fa..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.security.token.block; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Access token verification failed. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class InvalidBlockTokenException extends IOException { - private static final long serialVersionUID = 168L; - - public InvalidBlockTokenException() { - super(); - } - - public InvalidBlockTokenException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java deleted file mode 100644 index 215df13..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -/** - * The caching strategy we should use for an HDFS read or write operation. - */ -public class CachingStrategy { - private final Boolean dropBehind; // null = use server defaults - private final Long readahead; // null = use server defaults - - public static CachingStrategy newDefaultStrategy() { - return new CachingStrategy(null, null); - } - - public static CachingStrategy newDropBehind() { - return new CachingStrategy(true, null); - } - - public static class Builder { - private Boolean dropBehind; - private Long readahead; - - public Builder(CachingStrategy prev) { - this.dropBehind = prev.dropBehind; - this.readahead = prev.readahead; - } - - public Builder setDropBehind(Boolean dropBehind) { - this.dropBehind = dropBehind; - return this; - } - - public Builder setReadahead(Long readahead) { - this.readahead = readahead; - return this; - } - - public CachingStrategy build() { - return new CachingStrategy(dropBehind, readahead); - } - } - - public CachingStrategy(Boolean dropBehind, Long readahead) { - this.dropBehind = dropBehind; - this.readahead = readahead; - } - - public Boolean getDropBehind() { - return dropBehind; - } - - public Long getReadahead() { - return readahead; - } - - public String toString() { - return "CachingStrategy(dropBehind=" + dropBehind + - ", readahead=" + readahead + ")"; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java deleted file mode 100644 index 81cc68d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Iterator; - -import org.apache.hadoop.hdfs.net.DomainPeer; -import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.EndpointShmManager; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.DomainSocketWatcher; - -import com.google.common.base.Preconditions; - -/** - * DfsClientShm is a subclass of ShortCircuitShm which is used by the - * DfsClient. - * When the UNIX domain socket associated with this shared memory segment - * closes unexpectedly, we mark the slots inside this segment as disconnected. - * ShortCircuitReplica objects that contain disconnected slots are stale, - * and will not be used to service new reads or mmap operations. - * However, in-progress read or mmap operations will continue to proceed. - * Once the last slot is deallocated, the segment can be safely munmapped. - * - * Slots may also become stale because the associated replica has been deleted - * on the DataNode. In this case, the DataNode will clear the 'valid' bit. - * The client will then see these slots as stale (see - * #{ShortCircuitReplica#isStale}). - */ -public class DfsClientShm extends ShortCircuitShm - implements DomainSocketWatcher.Handler { - /** - * The EndpointShmManager associated with this shared memory segment. - */ - private final EndpointShmManager manager; - - /** - * The UNIX domain socket associated with this DfsClientShm. - * We rely on the DomainSocketWatcher to close the socket associated with - * this DomainPeer when necessary. - */ - private final DomainPeer peer; - - /** - * True if this shared memory segment has lost its connection to the - * DataNode. - * - * {@link DfsClientShm#handle} sets this to true. - */ - private boolean disconnected = false; - - DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager, - DomainPeer peer) throws IOException { - super(shmId, stream); - this.manager = manager; - this.peer = peer; - } - - public EndpointShmManager getEndpointShmManager() { - return manager; - } - - public DomainPeer getPeer() { - return peer; - } - - /** - * Determine if the shared memory segment is disconnected from the DataNode. - * - * This must be called with the DfsClientShmManager lock held. - * - * @return True if the shared memory segment is stale. - */ - public synchronized boolean isDisconnected() { - return disconnected; - } - - /** - * Handle the closure of the UNIX domain socket associated with this shared - * memory segment by marking this segment as stale. - * - * If there are no slots associated with this shared memory segment, it will - * be freed immediately in this function. - */ - @Override - public boolean handle(DomainSocket sock) { - manager.unregisterShm(getShmId()); - synchronized (this) { - Preconditions.checkState(!disconnected); - disconnected = true; - boolean hadSlots = false; - for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) { - Slot slot = iter.next(); - slot.makeInvalid(); - hadSlots = true; - } - if (!hadSlots) { - free(); - } - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java deleted file mode 100644 index f70398a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java +++ /dev/null @@ -1,522 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import java.io.BufferedOutputStream; -import java.io.Closeable; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map.Entry; -import java.util.TreeMap; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.commons.lang.mutable.MutableBoolean; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.net.DomainPeer; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.DomainSocketWatcher; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Manages short-circuit memory segments for an HDFS client. - * - * Clients are responsible for requesting and releasing shared memory segments used - * for communicating with the DataNode. The client will try to allocate new slots - * in the set of existing segments, falling back to getting a new segment from the - * DataNode via {@link DataTransferProtocol#requestShortCircuitFds}. - * - * The counterpart to this class on the DataNode is {@link ShortCircuitRegistry}. - * See {@link ShortCircuitRegistry} for more information on the communication protocol. - */ -@InterfaceAudience.Private -public class DfsClientShmManager implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger( - DfsClientShmManager.class); - - /** - * Manages short-circuit memory segments that pertain to a given DataNode. - */ - class EndpointShmManager { - /** - * The datanode we're managing. - */ - private final DatanodeInfo datanode; - - /** - * Shared memory segments which have no empty slots. - * - * Protected by the manager lock. - */ - private final TreeMap<ShmId, DfsClientShm> full = - new TreeMap<ShmId, DfsClientShm>(); - - /** - * Shared memory segments which have at least one empty slot. - * - * Protected by the manager lock. - */ - private final TreeMap<ShmId, DfsClientShm> notFull = - new TreeMap<ShmId, DfsClientShm>(); - - /** - * True if this datanode doesn't support short-circuit shared memory - * segments. - * - * Protected by the manager lock. - */ - private boolean disabled = false; - - /** - * True if we're in the process of loading a shared memory segment from - * this DataNode. - * - * Protected by the manager lock. - */ - private boolean loading = false; - - EndpointShmManager (DatanodeInfo datanode) { - this.datanode = datanode; - } - - /** - * Pull a slot out of a preexisting shared memory segment. - * - * Must be called with the manager lock held. - * - * @param blockId The blockId to put inside the Slot object. - * - * @return null if none of our shared memory segments contain a - * free slot; the slot object otherwise. - */ - private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) { - if (notFull.isEmpty()) { - return null; - } - Entry<ShmId, DfsClientShm> entry = notFull.firstEntry(); - DfsClientShm shm = entry.getValue(); - ShmId shmId = shm.getShmId(); - Slot slot = shm.allocAndRegisterSlot(blockId); - if (shm.isFull()) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() + - " out of " + shm); - } - DfsClientShm removedShm = notFull.remove(shmId); - Preconditions.checkState(removedShm == shm); - full.put(shmId, shm); - } else { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": pulled slot " + slot.getSlotIdx() + - " out of " + shm); - } - } - return slot; - } - - /** - * Ask the DataNode for a new shared memory segment. This function must be - * called with the manager lock held. We will release the lock while - * communicating with the DataNode. - * - * @param clientName The current client name. - * @param peer The peer to use to talk to the DataNode. - * - * @return Null if the DataNode does not support shared memory - * segments, or experienced an error creating the - * shm. The shared memory segment itself on success. - * @throws IOException If there was an error communicating over the socket. - * We will not throw an IOException unless the socket - * itself (or the network) is the problem. - */ - private DfsClientShm requestNewShm(String clientName, DomainPeer peer) - throws IOException { - final DataOutputStream out = - new DataOutputStream( - new BufferedOutputStream(peer.getOutputStream())); - new Sender(out).requestShortCircuitShm(clientName); - ShortCircuitShmResponseProto resp = - ShortCircuitShmResponseProto.parseFrom( - PBHelperClient.vintPrefixed(peer.getInputStream())); - String error = resp.hasError() ? resp.getError() : "(unknown)"; - switch (resp.getStatus()) { - case SUCCESS: - DomainSocket sock = peer.getDomainSocket(); - byte buf[] = new byte[1]; - FileInputStream fis[] = new FileInputStream[1]; - if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) { - throw new EOFException("got EOF while trying to transfer the " + - "file descriptor for the shared memory segment."); - } - if (fis[0] == null) { - throw new IOException("the datanode " + datanode + " failed to " + - "pass a file descriptor for the shared memory segment."); - } - try { - DfsClientShm shm = - new DfsClientShm(PBHelperClient.convert(resp.getId()), - fis[0], this, peer); - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": createNewShm: created " + shm); - } - return shm; - } finally { - try { - fis[0].close(); - } catch (Throwable e) { - LOG.debug("Exception in closing " + fis[0], e); - } - } - case ERROR_UNSUPPORTED: - // The DataNode just does not support short-circuit shared memory - // access, and we should stop asking. - LOG.info(this + ": datanode does not support short-circuit " + - "shared memory access: " + error); - disabled = true; - return null; - default: - // The datanode experienced some kind of unexpected error when trying to - // create the short-circuit shared memory segment. - LOG.warn(this + ": error requesting short-circuit shared memory " + - "access: " + error); - return null; - } - } - - /** - * Allocate a new shared memory slot connected to this datanode. - * - * Must be called with the EndpointShmManager lock held. - * - * @param peer The peer to use to talk to the DataNode. - * @param usedPeer (out param) Will be set to true if we used the peer. - * When a peer is used - * - * @param clientName The client name. - * @param blockId The block ID to use. - * @return null if the DataNode does not support shared memory - * segments, or experienced an error creating the - * shm. The shared memory segment itself on success. - * @throws IOException If there was an error communicating over the socket. - */ - Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer, - String clientName, ExtendedBlockId blockId) throws IOException { - while (true) { - if (closed) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": the DfsClientShmManager has been closed."); - } - return null; - } - if (disabled) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": shared memory segment access is disabled."); - } - return null; - } - // Try to use an existing slot. - Slot slot = allocSlotFromExistingShm(blockId); - if (slot != null) { - return slot; - } - // There are no free slots. If someone is loading more slots, wait - // for that to finish. - if (loading) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": waiting for loading to finish..."); - } - finishedLoading.awaitUninterruptibly(); - } else { - // Otherwise, load the slot ourselves. - loading = true; - lock.unlock(); - DfsClientShm shm; - try { - shm = requestNewShm(clientName, peer); - if (shm == null) continue; - // See #{DfsClientShmManager#domainSocketWatcher} for details - // about why we do this before retaking the manager lock. - domainSocketWatcher.add(peer.getDomainSocket(), shm); - // The DomainPeer is now our responsibility, and should not be - // closed by the caller. - usedPeer.setValue(true); - } finally { - lock.lock(); - loading = false; - finishedLoading.signalAll(); - } - if (shm.isDisconnected()) { - // If the peer closed immediately after the shared memory segment - // was created, the DomainSocketWatcher callback might already have - // fired and marked the shm as disconnected. In this case, we - // obviously don't want to add the SharedMemorySegment to our list - // of valid not-full segments. - if (LOG.isDebugEnabled()) { - LOG.debug(this + ": the UNIX domain socket associated with " + - "this short-circuit memory closed before we could make " + - "use of the shm."); - } - } else { - notFull.put(shm.getShmId(), shm); - } - } - } - } - - /** - * Stop tracking a slot. - * - * Must be called with the EndpointShmManager lock held. - * - * @param slot The slot to release. - */ - void freeSlot(Slot slot) { - DfsClientShm shm = (DfsClientShm)slot.getShm(); - shm.unregisterSlot(slot.getSlotIdx()); - if (shm.isDisconnected()) { - // Stale shared memory segments should not be tracked here. - Preconditions.checkState(!full.containsKey(shm.getShmId())); - Preconditions.checkState(!notFull.containsKey(shm.getShmId())); - if (shm.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": freeing empty stale " + shm); - } - shm.free(); - } - } else { - ShmId shmId = shm.getShmId(); - full.remove(shmId); // The shm can't be full if we just freed a slot. - if (shm.isEmpty()) { - notFull.remove(shmId); - - // If the shared memory segment is now empty, we call shutdown(2) on - // the UNIX domain socket associated with it. The DomainSocketWatcher, - // which is watching this socket, will call DfsClientShm#handle, - // cleaning up this shared memory segment. - // - // See #{DfsClientShmManager#domainSocketWatcher} for details about why - // we don't want to call DomainSocketWatcher#remove directly here. - // - // Note that we could experience 'fragmentation' here, where the - // DFSClient allocates a bunch of slots in different shared memory - // segments, and then frees most of them, but never fully empties out - // any segment. We make some attempt to avoid this fragmentation by - // always allocating new slots out of the shared memory segment with the - // lowest ID, but it could still occur. In most workloads, - // fragmentation should not be a major concern, since it doesn't impact - // peak file descriptor usage or the speed of allocation. - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": shutting down UNIX domain socket for " + - "empty " + shm); - } - shutdown(shm); - } else { - notFull.put(shmId, shm); - } - } - } - - /** - * Unregister a shared memory segment. - * - * Once a segment is unregistered, we will not allocate any more slots - * inside that segment. - * - * The DomainSocketWatcher calls this while holding the DomainSocketWatcher - * lock. - * - * @param shmId The ID of the shared memory segment to unregister. - */ - void unregisterShm(ShmId shmId) { - lock.lock(); - try { - full.remove(shmId); - notFull.remove(shmId); - } finally { - lock.unlock(); - } - } - - @Override - public String toString() { - return String.format("EndpointShmManager(%s, parent=%s)", - datanode, DfsClientShmManager.this); - } - - PerDatanodeVisitorInfo getVisitorInfo() { - return new PerDatanodeVisitorInfo(full, notFull, disabled); - } - - final void shutdown(DfsClientShm shm) { - try { - shm.getPeer().getDomainSocket().shutdown(); - } catch (IOException e) { - LOG.warn(this + ": error shutting down shm: got IOException calling " + - "shutdown(SHUT_RDWR)", e); - } - } - } - - private boolean closed = false; - - private final ReentrantLock lock = new ReentrantLock(); - - /** - * A condition variable which is signalled when we finish loading a segment - * from the Datanode. - */ - private final Condition finishedLoading = lock.newCondition(); - - /** - * Information about each Datanode. - */ - private final HashMap<DatanodeInfo, EndpointShmManager> datanodes = - new HashMap<DatanodeInfo, EndpointShmManager>(1); - - /** - * The DomainSocketWatcher which keeps track of the UNIX domain socket - * associated with each shared memory segment. - * - * Note: because the DomainSocketWatcher makes callbacks into this - * DfsClientShmManager object, you must MUST NOT attempt to take the - * DomainSocketWatcher lock while holding the DfsClientShmManager lock, - * or else deadlock might result. This means that most DomainSocketWatcher - * methods are off-limits unless you release the manager lock first. - */ - private final DomainSocketWatcher domainSocketWatcher; - - DfsClientShmManager(int interruptCheckPeriodMs) throws IOException { - this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs, - "client"); - } - - public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer, - MutableBoolean usedPeer, ExtendedBlockId blockId, - String clientName) throws IOException { - lock.lock(); - try { - if (closed) { - LOG.trace(this + ": the DfsClientShmManager isclosed."); - return null; - } - EndpointShmManager shmManager = datanodes.get(datanode); - if (shmManager == null) { - shmManager = new EndpointShmManager(datanode); - datanodes.put(datanode, shmManager); - } - return shmManager.allocSlot(peer, usedPeer, clientName, blockId); - } finally { - lock.unlock(); - } - } - - public void freeSlot(Slot slot) { - lock.lock(); - try { - DfsClientShm shm = (DfsClientShm)slot.getShm(); - shm.getEndpointShmManager().freeSlot(slot); - } finally { - lock.unlock(); - } - } - - @VisibleForTesting - public static class PerDatanodeVisitorInfo { - public final TreeMap<ShmId, DfsClientShm> full; - public final TreeMap<ShmId, DfsClientShm> notFull; - public final boolean disabled; - - PerDatanodeVisitorInfo(TreeMap<ShmId, DfsClientShm> full, - TreeMap<ShmId, DfsClientShm> notFull, boolean disabled) { - this.full = full; - this.notFull = notFull; - this.disabled = disabled; - } - } - - @VisibleForTesting - public interface Visitor { - void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) - throws IOException; - } - - @VisibleForTesting - public void visit(Visitor visitor) throws IOException { - lock.lock(); - try { - HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info = - new HashMap<DatanodeInfo, PerDatanodeVisitorInfo>(); - for (Entry<DatanodeInfo, EndpointShmManager> entry : - datanodes.entrySet()) { - info.put(entry.getKey(), entry.getValue().getVisitorInfo()); - } - visitor.visit(info); - } finally { - lock.unlock(); - } - } - - /** - * Close the DfsClientShmManager. - */ - @Override - public void close() throws IOException { - lock.lock(); - try { - if (closed) return; - closed = true; - } finally { - lock.unlock(); - } - // When closed, the domainSocketWatcher will issue callbacks that mark - // all the outstanding DfsClientShm segments as stale. - try { - domainSocketWatcher.close(); - } catch (Throwable e) { - LOG.debug("Exception in closing " + domainSocketWatcher, e); - } - } - - - @Override - public String toString() { - return String.format("ShortCircuitShmManager(%08x)", - System.identityHashCode(this)); - } - - @VisibleForTesting - public DomainSocketWatcher getDomainSocketWatcher() { - return domainSocketWatcher; - } -}