HDFS-8990. Move RemoteBlockReader to hdfs-client module. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/826ae1c2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/826ae1c2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/826ae1c2 Branch: refs/heads/HDFS-7285 Commit: 826ae1c26d31f87d88efc920b271bec7eec2e17a Parents: caa04de Author: Haohui Mai <whe...@apache.org> Authored: Mon Aug 31 13:54:14 2015 -0700 Committer: Haohui Mai <whe...@apache.org> Committed: Mon Aug 31 13:54:14 2015 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hdfs/RemoteBlockReader.java | 512 +++++++++++++++++++ .../apache/hadoop/hdfs/RemoteBlockReader2.java | 480 +++++++++++++++++ .../protocol/datatransfer/PacketHeader.java | 214 ++++++++ .../protocol/datatransfer/PacketReceiver.java | 310 +++++++++++ .../hdfs/util/ByteBufferOutputStream.java | 49 ++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../java/org/apache/hadoop/hdfs/DFSClient.java | 1 - .../apache/hadoop/hdfs/RemoteBlockReader.java | 508 ------------------ .../apache/hadoop/hdfs/RemoteBlockReader2.java | 477 ----------------- .../protocol/datatransfer/PacketHeader.java | 214 -------- .../protocol/datatransfer/PacketReceiver.java | 310 ----------- .../hdfs/util/ByteBufferOutputStream.java | 49 -- .../hdfs/TestClientBlockVerification.java | 4 +- 13 files changed, 1570 insertions(+), 1561 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java new file mode 100644 index 0000000..7509da5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FSInputChecker; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @deprecated this is an old implementation that is being left around + * in case any issues spring up with the new {@link RemoteBlockReader2} implementation. + * It will be removed in the next release. + */ +@InterfaceAudience.Private +@Deprecated +public class RemoteBlockReader extends FSInputChecker implements BlockReader { + static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class); + + private final Peer peer; + private final DatanodeID datanodeID; + private final DataInputStream in; + private DataChecksum checksum; + + /** offset in block of the last chunk received */ + private long lastChunkOffset = -1; + private long lastChunkLen = -1; + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + + private final long blockId; + + /** offset in block of of first chunk - may be less than startOffset + if startOffset is not chunk-aligned */ + private final long firstChunkOffset; + + private final int bytesPerChecksum; + private final int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private final long bytesNeededToFinish; + + /** + * True if we are reading from a local DataNode. + */ + private final boolean isLocal; + + private boolean eos = false; + private boolean sentStatusCode = false; + + ByteBuffer checksumBytes = null; + /** Amount of unread data in the current received packet */ + int dataLeft = 0; + + private final PeerCache peerCache; + + /* FSInputChecker interface */ + + /* same interface as inputStream java.io.InputStream#read() + * used by DFSInputStream#read() + * This violates one rule when there is a checksum error: + * "Read should not modify user buffer before successful read" + * because it first reads the data to user buffer and then checks + * the checksum. + */ + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + + // This has to be set here, *before* the skip, since we can + // hit EOS during the skip, in the case that our entire read + // is smaller than the checksum chunk. + boolean eosBefore = eos; + + //for the first read, skip the extra bytes at the front. + if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { + // Skip these bytes. But don't call this.skip()! + int toSkip = (int)(startOffset - firstChunkOffset); + if ( super.readAndDiscard(toSkip) != toSkip ) { + // should never happen + throw new IOException("Could not skip required number of bytes"); + } + } + + int nRead = super.read(buf, off, len); + + // if eos was set in the previous read, send a status code to the DN + if (eos && !eosBefore && nRead >= 0) { + if (needChecksum()) { + sendReadResult(peer, Status.CHECKSUM_OK); + } else { + sendReadResult(peer, Status.SUCCESS); + } + } + return nRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + long nSkipped = 0; + while (nSkipped < n) { + int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE); + int ret = readAndDiscard(toSkip); + if (ret <= 0) { + return nSkipped; + } + nSkipped += ret; + } + return nSkipped; + } + + @Override + public int read() throws IOException { + throw new IOException("read() is not expected to be invoked. " + + "Use read(buf, off, len) instead."); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + /* Checksum errors are handled outside the BlockReader. + * DFSInputStream does not always call 'seekToNewSource'. In the + * case of pread(), it just tries a different replica without seeking. + */ + return false; + } + + @Override + public void seek(long pos) throws IOException { + throw new IOException("Seek() is not supported in BlockInputChecker"); + } + + @Override + protected long getChunkPosition(long pos) { + throw new RuntimeException("getChunkPosition() is not supported, " + + "since seek is not required"); + } + + /** + * Makes sure that checksumBytes has enough capacity + * and limit is set to the number of checksum bytes needed + * to be read. + */ + private void adjustChecksumBytes(int dataLen) { + int requiredSize = + ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; + if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { + checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); + } else { + checksumBytes.clear(); + } + checksumBytes.limit(requiredSize); + } + + @Override + protected synchronized int readChunk(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + TraceScope scope = + Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")", + Sampler.NEVER); + try { + return readChunkImpl(pos, buf, offset, len, checksumBuf); + } finally { + scope.close(); + } + } + + private synchronized int readChunkImpl(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + // Read one chunk. + if (eos) { + // Already hit EOF + return -1; + } + + // Read one DATA_CHUNK. + long chunkOffset = lastChunkOffset; + if ( lastChunkLen > 0 ) { + chunkOffset += lastChunkLen; + } + + // pos is relative to the start of the first chunk of the read. + // chunkOffset is relative to the start of the block. + // This makes sure that the read passed from FSInputChecker is the + // for the same chunk we expect to be reading from the DN. + if ( (pos + firstChunkOffset) != chunkOffset ) { + throw new IOException("Mismatch in pos : " + pos + " + " + + firstChunkOffset + " != " + chunkOffset); + } + + // Read next packet if the previous packet has been read completely. + if (dataLeft <= 0) { + //Read packet headers. + PacketHeader header = new PacketHeader(); + header.readFields(in); + + if (LOG.isDebugEnabled()) { + LOG.debug("DFSClient readChunk got header " + header); + } + + // Sanity check the lengths + if (!header.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + header); + } + + lastSeqNo = header.getSeqno(); + dataLeft = header.getDataLen(); + adjustChecksumBytes(header.getDataLen()); + if (header.getDataLen() > 0) { + IOUtils.readFully(in, checksumBytes.array(), 0, + checksumBytes.limit()); + } + } + + // Sanity checks + assert len >= bytesPerChecksum; + assert checksum != null; + assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); + + + int checksumsToRead, bytesToRead; + + if (checksumSize > 0) { + + // How many chunks left in our packet - this is a ceiling + // since we may have a partial chunk at the end of the file + int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; + + // How many chunks we can fit in databuffer + // - note this is a floor since we always read full chunks + int chunksCanFit = Math.min(len / bytesPerChecksum, + checksumBuf.length / checksumSize); + + // How many chunks should we read + checksumsToRead = Math.min(chunksLeft, chunksCanFit); + // How many bytes should we actually read + bytesToRead = Math.min( + checksumsToRead * bytesPerChecksum, // full chunks + dataLeft); // in case we have a partial + } else { + // no checksum + bytesToRead = Math.min(dataLeft, len); + checksumsToRead = 0; + } + + if ( bytesToRead > 0 ) { + // Assert we have enough space + assert bytesToRead <= len; + assert checksumBytes.remaining() >= checksumSize * checksumsToRead; + assert checksumBuf.length >= checksumSize * checksumsToRead; + IOUtils.readFully(in, buf, offset, bytesToRead); + checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); + } + + dataLeft -= bytesToRead; + assert dataLeft >= 0; + + lastChunkOffset = chunkOffset; + lastChunkLen = bytesToRead; + + // If there's no data left in the current packet after satisfying + // this read, and we have satisfied the client read, we expect + // an empty packet header from the DN to signify this. + // Note that pos + bytesToRead may in fact be greater since the + // DN finishes off the entire last chunk. + if (dataLeft == 0 && + pos + bytesToRead >= bytesNeededToFinish) { + + // Read header + PacketHeader hdr = new PacketHeader(); + hdr.readFields(in); + + if (!hdr.isLastPacketInBlock() || + hdr.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + hdr); + } + + eos = true; + } + + if ( bytesToRead == 0 ) { + return -1; + } + + return bytesToRead; + } + + private RemoteBlockReader(String file, String bpid, long blockId, + DataInputStream in, DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, + DatanodeID datanodeID, PeerCache peerCache) { + // Path is used only for printing block and file information in debug + super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + + ":" + bpid + ":of:"+ file)/*too non path-like?*/, + 1, verifyChecksum, + checksum.getChecksumSize() > 0? checksum : null, + checksum.getBytesPerChecksum(), + checksum.getChecksumSize()); + + this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. + createSocketAddr(datanodeID.getXferAddr())); + + this.peer = peer; + this.datanodeID = datanodeID; + this.in = in; + this.checksum = checksum; + this.startOffset = Math.max( startOffset, 0 ); + this.blockId = blockId; + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + + this.firstChunkOffset = firstChunkOffset; + lastChunkOffset = firstChunkOffset; + lastChunkLen = -1; + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + this.peerCache = peerCache; + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param bufferSize The IO buffer size (not the client buffer size) + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @return New BlockReader instance, or null on error. + */ + public static RemoteBlockReader newBlockReader(String file, + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken, + long startOffset, long len, + int bufferSize, boolean verifyChecksum, + String clientName, Peer peer, + DatanodeID datanodeID, + PeerCache peerCache, + CachingStrategy cachingStrategy) + throws IOException { + // in and out will be closed when sock is closed (by the caller) + final DataOutputStream out = + new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, + verifyChecksum, cachingStrategy); + + // + // Get bytes in block, set streams + // + + DataInputStream in = new DataInputStream( + new BufferedInputStream(peer.getInputStream(), bufferSize)); + + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + RemoteBlockReader2.checkSuccess(status, peer, block, file); + ReadOpChecksumInfoProto checksumInfo = + status.getReadOpChecksumInfo(); + DataChecksum checksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = checksumInfo.getChunkOffset(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), + in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, + peer, datanodeID, peerCache); + } + + @Override + public synchronized void close() throws IOException { + startOffset = -1; + checksum = null; + if (peerCache != null & sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); + } + + // in will be closed when its Socket is closed. + } + + @Override + public void readFully(byte[] buf, int readOffset, int amtToRead) + throws IOException { + IOUtils.readFully(this, buf, readOffset, amtToRead); + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return readFully(this, buf, offset, len); + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Peer peer, Status statusCode) { + assert !sentStatusCode : "already sent status code to " + peer; + try { + RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + peer.getRemoteAddressString() + ": " + e.getMessage()); + } + } + + @Override + public int read(ByteBuffer buf) throws IOException { + throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); + } + + @Override + public int available() throws IOException { + // An optimistic estimate of how much data is available + // to us without doing network I/O. + return RemoteBlockReader2.TCP_WINDOW_SIZE; + } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public boolean isShortCircuit() { + return false; + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java new file mode 100644 index 0000000..5541e6d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -0,0 +1,480 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.EnumSet; +import java.util.UUID; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import com.google.common.annotations.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a wrapper around connection to datanode + * and understands checksum, offset etc. + * + * Terminology: + * <dl> + * <dt>block</dt> + * <dd>The hdfs block, typically large (~64MB). + * </dd> + * <dt>chunk</dt> + * <dd>A block is divided into chunks, each comes with a checksum. + * We want transfers to be chunk-aligned, to be able to + * verify checksums. + * </dd> + * <dt>packet</dt> + * <dd>A grouping of chunks used for transport. It contains a + * header, followed by checksum data, followed by real data. + * </dd> + * </dl> + * Please see DataNode for the RPC specification. + * + * This is a new implementation introduced in Hadoop 0.23 which + * is more efficient and simpler than the older BlockReader + * implementation. It should be renamed to RemoteBlockReader + * once we are confident in it. + */ +@InterfaceAudience.Private +public class RemoteBlockReader2 implements BlockReader { + + static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class); + static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB; + + final private Peer peer; + final private DatanodeID datanodeID; + final private PeerCache peerCache; + final private long blockId; + private final ReadableByteChannel in; + + private DataChecksum checksum; + private final PacketReceiver packetReceiver = new PacketReceiver(true); + + private ByteBuffer curDataSlice = null; + + /** offset in block of the last chunk received */ + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; + + private final int bytesPerChecksum; + private final int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private long bytesNeededToFinish; + + /** + * True if we are reading from a local DataNode. + */ + private final boolean isLocal; + + private final boolean verifyChecksum; + + private boolean sentStatusCode = false; + + @VisibleForTesting + public Peer getPeer() { + return peer; + } + + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + + UUID randomId = null; + if (LOG.isTraceEnabled()) { + randomId = UUID.randomUUID(); + LOG.trace(String.format("Starting read #%s file %s from datanode %s", + randomId.toString(), this.filename, + this.datanodeID.getHostName())); + } + + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + TraceScope scope = Trace.startSpan( + "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); + try { + readNextPacket(); + } finally { + scope.close(); + } + } + + if (LOG.isTraceEnabled()) { + LOG.trace(String.format("Finishing read #" + randomId)); + } + + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } + + int nRead = Math.min(curDataSlice.remaining(), len); + curDataSlice.get(buf, off, nRead); + + return nRead; + } + + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + TraceScope scope = Trace.startSpan( + "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); + try { + readNextPacket(); + } finally { + scope.close(); + } + } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } + + int nRead = Math.min(curDataSlice.remaining(), buf.remaining()); + ByteBuffer writeSlice = curDataSlice.duplicate(); + writeSlice.limit(writeSlice.position() + nRead); + buf.put(writeSlice); + curDataSlice.position(writeSlice.position()); + + return nRead; + } + + private void readNextPacket() throws IOException { + //Read packet headers. + packetReceiver.receiveNextPacket(in); + + PacketHeader curHeader = packetReceiver.getHeader(); + curDataSlice = packetReceiver.getDataSlice(); + assert curDataSlice.capacity() == curHeader.getDataLen(); + + if (LOG.isTraceEnabled()) { + LOG.trace("DFSClient readNextPacket got header " + curHeader); + } + + // Sanity check the lengths + if (!curHeader.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + curHeader); + } + + if (curHeader.getDataLen() > 0) { + int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; + int checksumsLen = chunks * checksumSize; + + assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : + "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + + " checksumsLen=" + checksumsLen; + + lastSeqNo = curHeader.getSeqno(); + if (verifyChecksum && curDataSlice.remaining() > 0) { + // N.B.: the checksum error offset reported here is actually + // relative to the start of the block, not the start of the file. + // This is slightly misleading, but preserves the behavior from + // the older BlockReader. + checksum.verifyChunkedSums(curDataSlice, + packetReceiver.getChecksumSlice(), + filename, curHeader.getOffsetInBlock()); + } + bytesNeededToFinish -= curHeader.getDataLen(); + } + + // First packet will include some data prior to the first byte + // the user requested. Skip it. + if (curHeader.getOffsetInBlock() < startOffset) { + int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); + curDataSlice.position(newPos); + } + + // If we've now satisfied the whole client read, read one last packet + // header, which should be empty + if (bytesNeededToFinish <= 0) { + readTrailingEmptyPacket(); + if (verifyChecksum) { + sendReadResult(Status.CHECKSUM_OK); + } else { + sendReadResult(Status.SUCCESS); + } + } + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + long skipped = 0; + while (skipped < n) { + long needToSkip = n - skipped; + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + readNextPacket(); + } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + break; + } + + int skip = (int)Math.min(curDataSlice.remaining(), needToSkip); + curDataSlice.position(curDataSlice.position() + skip); + skipped += skip; + } + return skipped; + } + + private void readTrailingEmptyPacket() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Reading empty packet at end of read"); + } + + packetReceiver.receiveNextPacket(in); + + PacketHeader trailer = packetReceiver.getHeader(); + if (!trailer.isLastPacketInBlock() || + trailer.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + trailer); + } + } + + protected RemoteBlockReader2(String file, String bpid, long blockId, + DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, + DatanodeID datanodeID, PeerCache peerCache) { + this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. + createSocketAddr(datanodeID.getXferAddr())); + // Path is used only for printing block and file information in debug + this.peer = peer; + this.datanodeID = datanodeID; + this.in = peer.getInputStreamChannel(); + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max( startOffset, 0 ); + this.filename = file; + this.peerCache = peerCache; + this.blockId = blockId; + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + } + + + @Override + public synchronized void close() throws IOException { + packetReceiver.close(); + startOffset = -1; + checksum = null; + if (peerCache != null && sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); + } + + // in will be closed when its Socket is closed. + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Status statusCode) { + assert !sentStatusCode : "already sent status code to " + peer; + try { + writeReadResult(peer.getOutputStream(), statusCode); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + peer.getRemoteAddressString() + ": " + e.getMessage()); + } + } + + /** + * Serialize the actual read result on the wire. + */ + static void writeReadResult(OutputStream out, Status statusCode) + throws IOException { + + ClientReadStatusProto.newBuilder() + .setStatus(statusCode) + .build() + .writeDelimitedTo(out); + + out.flush(); + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @param peer The Peer to use + * @param datanodeID The DatanodeID this peer is connected to + * @return New BlockReader instance, or null on error. + */ + public static BlockReader newBlockReader(String file, + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken, + long startOffset, long len, + boolean verifyChecksum, + String clientName, + Peer peer, DatanodeID datanodeID, + PeerCache peerCache, + CachingStrategy cachingStrategy) throws IOException { + // in and out will be closed when sock is closed (by the caller) + final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + peer.getOutputStream())); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, + verifyChecksum, cachingStrategy); + + // + // Get bytes in block + // + DataInputStream in = new DataInputStream(peer.getInputStream()); + + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + checkSuccess(status, peer, block, file); + ReadOpChecksumInfoProto checksumInfo = + status.getReadOpChecksumInfo(); + DataChecksum checksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = checksumInfo.getChunkOffset(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), + checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, + datanodeID, peerCache); + } + + static void checkSuccess( + BlockOpResponseProto status, Peer peer, + ExtendedBlock block, String file) + throws IOException { + String logInfo = "for OP_READ_BLOCK" + + ", self=" + peer.getLocalAddressString() + + ", remote=" + peer.getRemoteAddressString() + + ", for file " + file + + ", for pool " + block.getBlockPoolId() + + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); + DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); + } + + @Override + public int available() throws IOException { + // An optimistic estimate of how much data is available + // to us without doing network I/O. + return TCP_WINDOW_SIZE; + } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public boolean isShortCircuit() { + return false; + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java new file mode 100644 index 0000000..c9966a7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto; +import org.apache.hadoop.hdfs.util.ByteBufferOutputStream; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Shorts; +import com.google.common.primitives.Ints; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * Header data for each packet that goes through the read/write pipelines. + * Includes all of the information about the packet, excluding checksums and + * actual data. + * + * This data includes: + * - the offset in bytes into the HDFS block of the data in this packet + * - the sequence number of this packet in the pipeline + * - whether or not this is the last packet in the pipeline + * - the length of the data in this packet + * - whether or not this packet should be synced by the DNs. + * + * When serialized, this header is written out as a protocol buffer, preceded + * by a 4-byte integer representing the full packet length, and a 2-byte short + * representing the header length. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class PacketHeader { + private static final int MAX_PROTO_SIZE = + PacketHeaderProto.newBuilder() + .setOffsetInBlock(0) + .setSeqno(0) + .setLastPacketInBlock(false) + .setDataLen(0) + .setSyncBlock(false) + .build().getSerializedSize(); + public static final int PKT_LENGTHS_LEN = + Ints.BYTES + Shorts.BYTES; + public static final int PKT_MAX_HEADER_LEN = + PKT_LENGTHS_LEN + MAX_PROTO_SIZE; + + private int packetLen; + private PacketHeaderProto proto; + + public PacketHeader() { + } + + public PacketHeader(int packetLen, long offsetInBlock, long seqno, + boolean lastPacketInBlock, int dataLen, boolean syncBlock) { + this.packetLen = packetLen; + Preconditions.checkArgument(packetLen >= Ints.BYTES, + "packet len %s should always be at least 4 bytes", + packetLen); + + PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder() + .setOffsetInBlock(offsetInBlock) + .setSeqno(seqno) + .setLastPacketInBlock(lastPacketInBlock) + .setDataLen(dataLen); + + if (syncBlock) { + // Only set syncBlock if it is specified. + // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721 + // because it changes the length of the packet header, and BlockReceiver + // in that version did not support variable-length headers. + builder.setSyncBlock(syncBlock); + } + + proto = builder.build(); + } + + public int getDataLen() { + return proto.getDataLen(); + } + + public boolean isLastPacketInBlock() { + return proto.getLastPacketInBlock(); + } + + public long getSeqno() { + return proto.getSeqno(); + } + + public long getOffsetInBlock() { + return proto.getOffsetInBlock(); + } + + public int getPacketLen() { + return packetLen; + } + + public boolean getSyncBlock() { + return proto.getSyncBlock(); + } + + @Override + public String toString() { + return "PacketHeader with packetLen=" + packetLen + + " header data: " + + proto.toString(); + } + + public void setFieldsFromData( + int packetLen, byte[] headerData) throws InvalidProtocolBufferException { + this.packetLen = packetLen; + proto = PacketHeaderProto.parseFrom(headerData); + } + + public void readFields(ByteBuffer buf) throws IOException { + packetLen = buf.getInt(); + short protoLen = buf.getShort(); + byte[] data = new byte[protoLen]; + buf.get(data); + proto = PacketHeaderProto.parseFrom(data); + } + + public void readFields(DataInputStream in) throws IOException { + this.packetLen = in.readInt(); + short protoLen = in.readShort(); + byte[] data = new byte[protoLen]; + in.readFully(data); + proto = PacketHeaderProto.parseFrom(data); + } + + /** + * @return the number of bytes necessary to write out this header, + * including the length-prefixing of the payload and header + */ + public int getSerializedSize() { + return PKT_LENGTHS_LEN + proto.getSerializedSize(); + } + + /** + * Write the header into the buffer. + * This requires that PKT_HEADER_LEN bytes are available. + */ + public void putInBuffer(final ByteBuffer buf) { + assert proto.getSerializedSize() <= MAX_PROTO_SIZE + : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize(); + try { + buf.putInt(packetLen); + buf.putShort((short) proto.getSerializedSize()); + proto.writeTo(new ByteBufferOutputStream(buf)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void write(DataOutputStream out) throws IOException { + assert proto.getSerializedSize() <= MAX_PROTO_SIZE + : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize(); + out.writeInt(packetLen); + out.writeShort(proto.getSerializedSize()); + proto.writeTo(out); + } + + public byte[] getBytes() { + ByteBuffer buf = ByteBuffer.allocate(getSerializedSize()); + putInBuffer(buf); + return buf.array(); + } + + /** + * Perform a sanity check on the packet, returning true if it is sane. + * @param lastSeqNo the previous sequence number received - we expect the current + * sequence number to be larger by 1. + */ + public boolean sanityCheck(long lastSeqNo) { + // We should only have a non-positive data length for the last packet + if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false; + // The last packet should not contain data + if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false; + // Seqnos should always increase by 1 with each packet received + if (proto.getSeqno() != lastSeqNo + 1) return false; + return true; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PacketHeader)) return false; + PacketHeader other = (PacketHeader)o; + return this.proto.equals(other.proto); + } + + @Override + public int hashCode() { + return (int)proto.getSeqno(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java new file mode 100644 index 0000000..c4093b1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.Closeable; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.DirectBufferPool; +import org.apache.hadoop.io.IOUtils; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to handle reading packets one-at-a-time from the wire. + * These packets are used both for reading and writing data to/from + * DataNodes. + */ +@InterfaceAudience.Private +public class PacketReceiver implements Closeable { + + /** + * The max size of any single packet. This prevents OOMEs when + * invalid data is sent. + */ + private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; + + static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class); + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + private final boolean useDirectBuffers; + + /** + * The entirety of the most recently read packet. + * The first PKT_LENGTHS_LEN bytes of this buffer are the + * length prefixes. + */ + private ByteBuffer curPacketBuf = null; + + /** + * A slice of {@link #curPacketBuf} which contains just the checksums. + */ + private ByteBuffer curChecksumSlice = null; + + /** + * A slice of {@link #curPacketBuf} which contains just the data. + */ + private ByteBuffer curDataSlice = null; + + /** + * The packet header of the most recently read packet. + */ + private PacketHeader curHeader; + + public PacketReceiver(boolean useDirectBuffers) { + this.useDirectBuffers = useDirectBuffers; + reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN); + } + + public PacketHeader getHeader() { + return curHeader; + } + + public ByteBuffer getDataSlice() { + return curDataSlice; + } + + public ByteBuffer getChecksumSlice() { + return curChecksumSlice; + } + + /** + * Reads all of the data for the next packet into the appropriate buffers. + * + * The data slice and checksum slice members will be set to point to the + * user data and corresponding checksums. The header will be parsed and + * set. + */ + public void receiveNextPacket(ReadableByteChannel in) throws IOException { + doRead(in, null); + } + + /** + * @see #receiveNextPacket(ReadableByteChannel) + */ + public void receiveNextPacket(InputStream in) throws IOException { + doRead(null, in); + } + + private void doRead(ReadableByteChannel ch, InputStream in) + throws IOException { + // Each packet looks like: + // PLEN HLEN HEADER CHECKSUMS DATA + // 32-bit 16-bit <protobuf> <variable length> + // + // PLEN: Payload length + // = length(PLEN) + length(CHECKSUMS) + length(DATA) + // This length includes its own encoded length in + // the sum for historical reasons. + // + // HLEN: Header length + // = length(HEADER) + // + // HEADER: the actual packet header fields, encoded in protobuf + // CHECKSUMS: the crcs for the data chunk. May be missing if + // checksums were not requested + // DATA the actual block data + Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock()); + + curPacketBuf.clear(); + curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN); + doReadFully(ch, in, curPacketBuf); + curPacketBuf.flip(); + int payloadLen = curPacketBuf.getInt(); + + if (payloadLen < Ints.BYTES) { + // The "payload length" includes its own length. Therefore it + // should never be less than 4 bytes + throw new IOException("Invalid payload length " + + payloadLen); + } + int dataPlusChecksumLen = payloadLen - Ints.BYTES; + int headerLen = curPacketBuf.getShort(); + if (headerLen < 0) { + throw new IOException("Invalid header length " + headerLen); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen + + " headerLen = " + headerLen); + } + + // Sanity check the buffer size so we don't allocate too much memory + // and OOME. + int totalLen = payloadLen + headerLen; + if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) { + throw new IOException("Incorrect value for packet payload size: " + + payloadLen); + } + + // Make sure we have space for the whole packet, and + // read it. + reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN + + dataPlusChecksumLen + headerLen); + curPacketBuf.clear(); + curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN); + curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN + + dataPlusChecksumLen + headerLen); + doReadFully(ch, in, curPacketBuf); + curPacketBuf.flip(); + curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN); + + // Extract the header from the front of the buffer (after the length prefixes) + byte[] headerBuf = new byte[headerLen]; + curPacketBuf.get(headerBuf); + if (curHeader == null) { + curHeader = new PacketHeader(); + } + curHeader.setFieldsFromData(payloadLen, headerBuf); + + // Compute the sub-slices of the packet + int checksumLen = dataPlusChecksumLen - curHeader.getDataLen(); + if (checksumLen < 0) { + throw new IOException("Invalid packet: data length in packet header " + + "exceeds data length received. dataPlusChecksumLen=" + + dataPlusChecksumLen + " header: " + curHeader); + } + + reslicePacket(headerLen, checksumLen, curHeader.getDataLen()); + } + + /** + * Rewrite the last-read packet on the wire to the given output stream. + */ + public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException { + Preconditions.checkState(!useDirectBuffers, + "Currently only supported for non-direct buffers"); + mirrorOut.write(curPacketBuf.array(), + curPacketBuf.arrayOffset(), + curPacketBuf.remaining()); + } + + + private static void doReadFully(ReadableByteChannel ch, InputStream in, + ByteBuffer buf) throws IOException { + if (ch != null) { + readChannelFully(ch, buf); + } else { + Preconditions.checkState(!buf.isDirect(), + "Must not use direct buffers with InputStream API"); + IOUtils.readFully(in, buf.array(), + buf.arrayOffset() + buf.position(), + buf.remaining()); + buf.position(buf.position() + buf.remaining()); + } + } + + private void reslicePacket( + int headerLen, int checksumsLen, int dataLen) { + // Packet structure (refer to doRead() for details): + // PLEN HLEN HEADER CHECKSUMS DATA + // 32-bit 16-bit <protobuf> <variable length> + // |--- lenThroughHeader ----| + // |----------- lenThroughChecksums ----| + // |------------------- lenThroughData ------| + int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen; + int lenThroughChecksums = lenThroughHeader + checksumsLen; + int lenThroughData = lenThroughChecksums + dataLen; + + assert dataLen >= 0 : "invalid datalen: " + dataLen; + assert curPacketBuf.position() == lenThroughHeader; + assert curPacketBuf.limit() == lenThroughData : + "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen + + " rem=" + curPacketBuf.remaining(); + + // Slice the checksums. + curPacketBuf.position(lenThroughHeader); + curPacketBuf.limit(lenThroughChecksums); + curChecksumSlice = curPacketBuf.slice(); + + // Slice the data. + curPacketBuf.position(lenThroughChecksums); + curPacketBuf.limit(lenThroughData); + curDataSlice = curPacketBuf.slice(); + + // Reset buffer to point to the entirety of the packet (including + // length prefixes) + curPacketBuf.position(0); + curPacketBuf.limit(lenThroughData); + } + + + private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf) + throws IOException { + while (buf.remaining() > 0) { + int n = ch.read(buf); + if (n < 0) { + throw new IOException("Premature EOF reading from " + ch); + } + } + } + + private void reallocPacketBuf(int atLeastCapacity) { + // Realloc the buffer if this packet is longer than the previous + // one. + if (curPacketBuf == null || + curPacketBuf.capacity() < atLeastCapacity) { + ByteBuffer newBuf; + if (useDirectBuffers) { + newBuf = bufferPool.getBuffer(atLeastCapacity); + } else { + newBuf = ByteBuffer.allocate(atLeastCapacity); + } + // If reallocing an existing buffer, copy the old packet length + // prefixes over + if (curPacketBuf != null) { + curPacketBuf.flip(); + newBuf.put(curPacketBuf); + } + + returnPacketBufToPool(); + curPacketBuf = newBuf; + } + } + + private void returnPacketBufToPool() { + if (curPacketBuf != null && curPacketBuf.isDirect()) { + bufferPool.returnBuffer(curPacketBuf); + curPacketBuf = null; + } + } + + @Override // Closeable + public void close() { + returnPacketBufToPool(); + } + + @Override + protected void finalize() throws Throwable { + try { + // just in case it didn't get closed, we + // may as well still try to return the buffer + returnPacketBufToPool(); + } finally { + super.finalize(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java new file mode 100644 index 0000000..31d4dcc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * OutputStream that writes into a {@link ByteBuffer}. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ByteBufferOutputStream extends OutputStream { + + private final ByteBuffer buf; + + public ByteBufferOutputStream(ByteBuffer buf) { + this.buf = buf; + } + + @Override + public void write(int b) throws IOException { + buf.put((byte)b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + buf.put(b, off, len); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7b5979e..ef8fac5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -867,6 +867,9 @@ Release 2.8.0 - UNRELEASED HDFS-8980. Remove unnecessary block replacement in INodeFile. (jing9) + HDFS-8990. Move RemoteBlockReader to hdfs-client module. + (Mingliang via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 3c49ef7..268a5b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -203,7 +203,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, DataEncryptionKeyFactory { public static final Log LOG = LogFactory.getLog(DFSClient.class); public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour - static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB private final Configuration conf; private final DfsClientConf dfsClientConf; http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java deleted file mode 100644 index 015e154..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ /dev/null @@ -1,508 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumSet; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FSInputChecker; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; - - -/** - * @deprecated this is an old implementation that is being left around - * in case any issues spring up with the new {@link RemoteBlockReader2} implementation. - * It will be removed in the next release. - */ -@InterfaceAudience.Private -@Deprecated -public class RemoteBlockReader extends FSInputChecker implements BlockReader { - private final Peer peer; - private final DatanodeID datanodeID; - private final DataInputStream in; - private DataChecksum checksum; - - /** offset in block of the last chunk received */ - private long lastChunkOffset = -1; - private long lastChunkLen = -1; - private long lastSeqNo = -1; - - /** offset in block where reader wants to actually read */ - private long startOffset; - - private final long blockId; - - /** offset in block of of first chunk - may be less than startOffset - if startOffset is not chunk-aligned */ - private final long firstChunkOffset; - - private final int bytesPerChecksum; - private final int checksumSize; - - /** - * The total number of bytes we need to transfer from the DN. - * This is the amount that the user has requested plus some padding - * at the beginning so that the read can begin on a chunk boundary. - */ - private final long bytesNeededToFinish; - - /** - * True if we are reading from a local DataNode. - */ - private final boolean isLocal; - - private boolean eos = false; - private boolean sentStatusCode = false; - - ByteBuffer checksumBytes = null; - /** Amount of unread data in the current received packet */ - int dataLeft = 0; - - private final PeerCache peerCache; - - /* FSInputChecker interface */ - - /* same interface as inputStream java.io.InputStream#read() - * used by DFSInputStream#read() - * This violates one rule when there is a checksum error: - * "Read should not modify user buffer before successful read" - * because it first reads the data to user buffer and then checks - * the checksum. - */ - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - - // This has to be set here, *before* the skip, since we can - // hit EOS during the skip, in the case that our entire read - // is smaller than the checksum chunk. - boolean eosBefore = eos; - - //for the first read, skip the extra bytes at the front. - if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { - // Skip these bytes. But don't call this.skip()! - int toSkip = (int)(startOffset - firstChunkOffset); - if ( super.readAndDiscard(toSkip) != toSkip ) { - // should never happen - throw new IOException("Could not skip required number of bytes"); - } - } - - int nRead = super.read(buf, off, len); - - // if eos was set in the previous read, send a status code to the DN - if (eos && !eosBefore && nRead >= 0) { - if (needChecksum()) { - sendReadResult(peer, Status.CHECKSUM_OK); - } else { - sendReadResult(peer, Status.SUCCESS); - } - } - return nRead; - } - - @Override - public synchronized long skip(long n) throws IOException { - /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ - long nSkipped = 0; - while (nSkipped < n) { - int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE); - int ret = readAndDiscard(toSkip); - if (ret <= 0) { - return nSkipped; - } - nSkipped += ret; - } - return nSkipped; - } - - @Override - public int read() throws IOException { - throw new IOException("read() is not expected to be invoked. " + - "Use read(buf, off, len) instead."); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - /* Checksum errors are handled outside the BlockReader. - * DFSInputStream does not always call 'seekToNewSource'. In the - * case of pread(), it just tries a different replica without seeking. - */ - return false; - } - - @Override - public void seek(long pos) throws IOException { - throw new IOException("Seek() is not supported in BlockInputChecker"); - } - - @Override - protected long getChunkPosition(long pos) { - throw new RuntimeException("getChunkPosition() is not supported, " + - "since seek is not required"); - } - - /** - * Makes sure that checksumBytes has enough capacity - * and limit is set to the number of checksum bytes needed - * to be read. - */ - private void adjustChecksumBytes(int dataLen) { - int requiredSize = - ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; - if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { - checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); - } else { - checksumBytes.clear(); - } - checksumBytes.limit(requiredSize); - } - - @Override - protected synchronized int readChunk(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { - TraceScope scope = - Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")", - Sampler.NEVER); - try { - return readChunkImpl(pos, buf, offset, len, checksumBuf); - } finally { - scope.close(); - } - } - - private synchronized int readChunkImpl(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { - // Read one chunk. - if (eos) { - // Already hit EOF - return -1; - } - - // Read one DATA_CHUNK. - long chunkOffset = lastChunkOffset; - if ( lastChunkLen > 0 ) { - chunkOffset += lastChunkLen; - } - - // pos is relative to the start of the first chunk of the read. - // chunkOffset is relative to the start of the block. - // This makes sure that the read passed from FSInputChecker is the - // for the same chunk we expect to be reading from the DN. - if ( (pos + firstChunkOffset) != chunkOffset ) { - throw new IOException("Mismatch in pos : " + pos + " + " + - firstChunkOffset + " != " + chunkOffset); - } - - // Read next packet if the previous packet has been read completely. - if (dataLeft <= 0) { - //Read packet headers. - PacketHeader header = new PacketHeader(); - header.readFields(in); - - if (LOG.isDebugEnabled()) { - LOG.debug("DFSClient readChunk got header " + header); - } - - // Sanity check the lengths - if (!header.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - header); - } - - lastSeqNo = header.getSeqno(); - dataLeft = header.getDataLen(); - adjustChecksumBytes(header.getDataLen()); - if (header.getDataLen() > 0) { - IOUtils.readFully(in, checksumBytes.array(), 0, - checksumBytes.limit()); - } - } - - // Sanity checks - assert len >= bytesPerChecksum; - assert checksum != null; - assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); - - - int checksumsToRead, bytesToRead; - - if (checksumSize > 0) { - - // How many chunks left in our packet - this is a ceiling - // since we may have a partial chunk at the end of the file - int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; - - // How many chunks we can fit in databuffer - // - note this is a floor since we always read full chunks - int chunksCanFit = Math.min(len / bytesPerChecksum, - checksumBuf.length / checksumSize); - - // How many chunks should we read - checksumsToRead = Math.min(chunksLeft, chunksCanFit); - // How many bytes should we actually read - bytesToRead = Math.min( - checksumsToRead * bytesPerChecksum, // full chunks - dataLeft); // in case we have a partial - } else { - // no checksum - bytesToRead = Math.min(dataLeft, len); - checksumsToRead = 0; - } - - if ( bytesToRead > 0 ) { - // Assert we have enough space - assert bytesToRead <= len; - assert checksumBytes.remaining() >= checksumSize * checksumsToRead; - assert checksumBuf.length >= checksumSize * checksumsToRead; - IOUtils.readFully(in, buf, offset, bytesToRead); - checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); - } - - dataLeft -= bytesToRead; - assert dataLeft >= 0; - - lastChunkOffset = chunkOffset; - lastChunkLen = bytesToRead; - - // If there's no data left in the current packet after satisfying - // this read, and we have satisfied the client read, we expect - // an empty packet header from the DN to signify this. - // Note that pos + bytesToRead may in fact be greater since the - // DN finishes off the entire last chunk. - if (dataLeft == 0 && - pos + bytesToRead >= bytesNeededToFinish) { - - // Read header - PacketHeader hdr = new PacketHeader(); - hdr.readFields(in); - - if (!hdr.isLastPacketInBlock() || - hdr.getDataLen() != 0) { - throw new IOException("Expected empty end-of-read packet! Header: " + - hdr); - } - - eos = true; - } - - if ( bytesToRead == 0 ) { - return -1; - } - - return bytesToRead; - } - - private RemoteBlockReader(String file, String bpid, long blockId, - DataInputStream in, DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache) { - // Path is used only for printing block and file information in debug - super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + - ":" + bpid + ":of:"+ file)/*too non path-like?*/, - 1, verifyChecksum, - checksum.getChecksumSize() > 0? checksum : null, - checksum.getBytesPerChecksum(), - checksum.getChecksumSize()); - - this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. - createSocketAddr(datanodeID.getXferAddr())); - - this.peer = peer; - this.datanodeID = datanodeID; - this.in = in; - this.checksum = checksum; - this.startOffset = Math.max( startOffset, 0 ); - this.blockId = blockId; - - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); - - this.firstChunkOffset = firstChunkOffset; - lastChunkOffset = firstChunkOffset; - lastChunkLen = -1; - - bytesPerChecksum = this.checksum.getBytesPerChecksum(); - checksumSize = this.checksum.getChecksumSize(); - this.peerCache = peerCache; - } - - /** - * Create a new BlockReader specifically to satisfy a read. - * This method also sends the OP_READ_BLOCK request. - * - * @param file File location - * @param block The block object - * @param blockToken The block token for security - * @param startOffset The read offset, relative to block head - * @param len The number of bytes to read - * @param bufferSize The IO buffer size (not the client buffer size) - * @param verifyChecksum Whether to verify checksum - * @param clientName Client name - * @return New BlockReader instance, or null on error. - */ - public static RemoteBlockReader newBlockReader(String file, - ExtendedBlock block, - Token<BlockTokenIdentifier> blockToken, - long startOffset, long len, - int bufferSize, boolean verifyChecksum, - String clientName, Peer peer, - DatanodeID datanodeID, - PeerCache peerCache, - CachingStrategy cachingStrategy) - throws IOException { - // in and out will be closed when sock is closed (by the caller) - final DataOutputStream out = - new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, - verifyChecksum, cachingStrategy); - - // - // Get bytes in block, set streams - // - - DataInputStream in = new DataInputStream( - new BufferedInputStream(peer.getInputStream(), bufferSize)); - - BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - RemoteBlockReader2.checkSuccess(status, peer, block, file); - ReadOpChecksumInfoProto checksumInfo = - status.getReadOpChecksumInfo(); - DataChecksum checksum = DataTransferProtoUtil.fromProto( - checksumInfo.getChecksum()); - //Warning when we get CHECKSUM_NULL? - - // Read the first chunk offset. - long firstChunkOffset = checksumInfo.getChunkOffset(); - - if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || - firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { - throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); - } - - return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), - in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, - peer, datanodeID, peerCache); - } - - @Override - public synchronized void close() throws IOException { - startOffset = -1; - checksum = null; - if (peerCache != null & sentStatusCode) { - peerCache.put(datanodeID, peer); - } else { - peer.close(); - } - - // in will be closed when its Socket is closed. - } - - @Override - public void readFully(byte[] buf, int readOffset, int amtToRead) - throws IOException { - IOUtils.readFully(this, buf, readOffset, amtToRead); - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return readFully(this, buf, offset, len); - } - - /** - * When the reader reaches end of the read, it sends a status response - * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN - * closing our connection (which we will re-open), but won't affect - * data correctness. - */ - void sendReadResult(Peer peer, Status statusCode) { - assert !sentStatusCode : "already sent status code to " + peer; - try { - RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode); - sentStatusCode = true; - } catch (IOException e) { - // It's ok not to be able to send this. But something is probably wrong. - LOG.info("Could not send read status (" + statusCode + ") to datanode " + - peer.getRemoteAddressString() + ": " + e.getMessage()); - } - } - - @Override - public int read(ByteBuffer buf) throws IOException { - throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); - } - - @Override - public int available() throws IOException { - // An optimistic estimate of how much data is available - // to us without doing network I/O. - return DFSClient.TCP_WINDOW_SIZE; - } - - @Override - public boolean isLocal() { - return isLocal; - } - - @Override - public boolean isShortCircuit() { - return false; - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - return null; - } -}