http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java deleted file mode 100644 index 78325a3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java +++ /dev/null @@ -1,647 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import java.io.FileInputStream; -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.BitSet; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Random; - -import org.apache.commons.lang.builder.EqualsBuilder; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.hadoop.fs.InvalidRequestException; -import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.io.nativeio.NativeIO.POSIX; -import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.StringUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import sun.misc.Unsafe; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ComparisonChain; -import com.google.common.primitives.Ints; - -/** - * A shared memory segment used to implement short-circuit reads. - */ -public class ShortCircuitShm { - private static final Logger LOG = LoggerFactory.getLogger(ShortCircuitShm.class); - - protected static final int BYTES_PER_SLOT = 64; - - private static final Unsafe unsafe = safetyDance(); - - private static Unsafe safetyDance() { - try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe)f.get(null); - } catch (Throwable e) { - LOG.error("failed to load misc.Unsafe", e); - } - return null; - } - - /** - * Calculate the usable size of a shared memory segment. - * We round down to a multiple of the slot size and do some validation. - * - * @param stream The stream we're using. - * @return The usable size of the shared memory segment. - */ - private static int getUsableLength(FileInputStream stream) - throws IOException { - int intSize = Ints.checkedCast(stream.getChannel().size()); - int slots = intSize / BYTES_PER_SLOT; - if (slots == 0) { - throw new IOException("size of shared memory segment was " + - intSize + ", but that is not enough to hold even one slot."); - } - return slots * BYTES_PER_SLOT; - } - - /** - * Identifies a DfsClientShm. - */ - public static class ShmId implements Comparable<ShmId> { - private static final Random random = new Random(); - private final long hi; - private final long lo; - - /** - * Generate a random ShmId. - * - * We generate ShmIds randomly to prevent a malicious client from - * successfully guessing one and using that to interfere with another - * client. - */ - public static ShmId createRandom() { - return new ShmId(random.nextLong(), random.nextLong()); - } - - public ShmId(long hi, long lo) { - this.hi = hi; - this.lo = lo; - } - - public long getHi() { - return hi; - } - - public long getLo() { - return lo; - } - - @Override - public boolean equals(Object o) { - if ((o == null) || (o.getClass() != this.getClass())) { - return false; - } - ShmId other = (ShmId)o; - return new EqualsBuilder(). - append(hi, other.hi). - append(lo, other.lo). - isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(). - append(this.hi). - append(this.lo). - toHashCode(); - } - - @Override - public String toString() { - return String.format("%016x%016x", hi, lo); - } - - @Override - public int compareTo(ShmId other) { - return ComparisonChain.start(). - compare(hi, other.hi). - compare(lo, other.lo). - result(); - } - }; - - /** - * Uniquely identifies a slot. - */ - public static class SlotId { - private final ShmId shmId; - private final int slotIdx; - - public SlotId(ShmId shmId, int slotIdx) { - this.shmId = shmId; - this.slotIdx = slotIdx; - } - - public ShmId getShmId() { - return shmId; - } - - public int getSlotIdx() { - return slotIdx; - } - - @Override - public boolean equals(Object o) { - if ((o == null) || (o.getClass() != this.getClass())) { - return false; - } - SlotId other = (SlotId)o; - return new EqualsBuilder(). - append(shmId, other.shmId). - append(slotIdx, other.slotIdx). - isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(). - append(this.shmId). - append(this.slotIdx). - toHashCode(); - } - - @Override - public String toString() { - return String.format("SlotId(%s:%d)", shmId.toString(), slotIdx); - } - } - - public class SlotIterator implements Iterator<Slot> { - int slotIdx = -1; - - @Override - public boolean hasNext() { - synchronized (ShortCircuitShm.this) { - return allocatedSlots.nextSetBit(slotIdx + 1) != -1; - } - } - - @Override - public Slot next() { - synchronized (ShortCircuitShm.this) { - int nextSlotIdx = allocatedSlots.nextSetBit(slotIdx + 1); - if (nextSlotIdx == -1) { - throw new NoSuchElementException(); - } - slotIdx = nextSlotIdx; - return slots[nextSlotIdx]; - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException("SlotIterator " + - "doesn't support removal"); - } - } - - /** - * A slot containing information about a replica. - * - * The format is: - * word 0 - * bit 0:32 Slot flags (see below). - * bit 33:63 Anchor count. - * word 1:7 - * Reserved for future use, such as statistics. - * Padding is also useful for avoiding false sharing. - * - * Little-endian versus big-endian is not relevant here since both the client - * and the server reside on the same computer and use the same orientation. - */ - public class Slot { - /** - * Flag indicating that the slot is valid. - * - * The DFSClient sets this flag when it allocates a new slot within one of - * its shared memory regions. - * - * The DataNode clears this flag when the replica associated with this slot - * is no longer valid. The client itself also clears this flag when it - * believes that the DataNode is no longer using this slot to communicate. - */ - private static final long VALID_FLAG = 1L<<63; - - /** - * Flag indicating that the slot can be anchored. - */ - private static final long ANCHORABLE_FLAG = 1L<<62; - - /** - * The slot address in memory. - */ - private final long slotAddress; - - /** - * BlockId of the block this slot is used for. - */ - private final ExtendedBlockId blockId; - - Slot(long slotAddress, ExtendedBlockId blockId) { - this.slotAddress = slotAddress; - this.blockId = blockId; - } - - /** - * Get the short-circuit memory segment associated with this Slot. - * - * @return The enclosing short-circuit memory segment. - */ - public ShortCircuitShm getShm() { - return ShortCircuitShm.this; - } - - /** - * Get the ExtendedBlockId associated with this slot. - * - * @return The ExtendedBlockId of this slot. - */ - public ExtendedBlockId getBlockId() { - return blockId; - } - - /** - * Get the SlotId of this slot, containing both shmId and slotIdx. - * - * @return The SlotId of this slot. - */ - public SlotId getSlotId() { - return new SlotId(getShmId(), getSlotIdx()); - } - - /** - * Get the Slot index. - * - * @return The index of this slot. - */ - public int getSlotIdx() { - return Ints.checkedCast( - (slotAddress - baseAddress) / BYTES_PER_SLOT); - } - - /** - * Clear the slot. - */ - void clear() { - unsafe.putLongVolatile(null, this.slotAddress, 0); - } - - private boolean isSet(long flag) { - long prev = unsafe.getLongVolatile(null, this.slotAddress); - return (prev & flag) != 0; - } - - private void setFlag(long flag) { - long prev; - do { - prev = unsafe.getLongVolatile(null, this.slotAddress); - if ((prev & flag) != 0) { - return; - } - } while (!unsafe.compareAndSwapLong(null, this.slotAddress, - prev, prev | flag)); - } - - private void clearFlag(long flag) { - long prev; - do { - prev = unsafe.getLongVolatile(null, this.slotAddress); - if ((prev & flag) == 0) { - return; - } - } while (!unsafe.compareAndSwapLong(null, this.slotAddress, - prev, prev & (~flag))); - } - - public boolean isValid() { - return isSet(VALID_FLAG); - } - - public void makeValid() { - setFlag(VALID_FLAG); - } - - public void makeInvalid() { - clearFlag(VALID_FLAG); - } - - public boolean isAnchorable() { - return isSet(ANCHORABLE_FLAG); - } - - public void makeAnchorable() { - setFlag(ANCHORABLE_FLAG); - } - - public void makeUnanchorable() { - clearFlag(ANCHORABLE_FLAG); - } - - public boolean isAnchored() { - long prev = unsafe.getLongVolatile(null, this.slotAddress); - if ((prev & VALID_FLAG) == 0) { - // Slot is no longer valid. - return false; - } - return ((prev & 0x7fffffff) != 0); - } - - /** - * Try to add an anchor for a given slot. - * - * When a slot is anchored, we know that the block it refers to is resident - * in memory. - * - * @return True if the slot is anchored. - */ - public boolean addAnchor() { - long prev; - do { - prev = unsafe.getLongVolatile(null, this.slotAddress); - if ((prev & VALID_FLAG) == 0) { - // Slot is no longer valid. - return false; - } - if ((prev & ANCHORABLE_FLAG) == 0) { - // Slot can't be anchored right now. - return false; - } - if ((prev & 0x7fffffff) == 0x7fffffff) { - // Too many other threads have anchored the slot (2 billion?) - return false; - } - } while (!unsafe.compareAndSwapLong(null, this.slotAddress, - prev, prev + 1)); - return true; - } - - /** - * Remove an anchor for a given slot. - */ - public void removeAnchor() { - long prev; - do { - prev = unsafe.getLongVolatile(null, this.slotAddress); - Preconditions.checkState((prev & 0x7fffffff) != 0, - "Tried to remove anchor for slot " + slotAddress +", which was " + - "not anchored."); - } while (!unsafe.compareAndSwapLong(null, this.slotAddress, - prev, prev - 1)); - } - - @Override - public String toString() { - return "Slot(slotIdx=" + getSlotIdx() + ", shm=" + getShm() + ")"; - } - } - - /** - * ID for this SharedMemorySegment. - */ - private final ShmId shmId; - - /** - * The base address of the memory-mapped file. - */ - private final long baseAddress; - - /** - * The mmapped length of the shared memory segment - */ - private final int mmappedLength; - - /** - * The slots associated with this shared memory segment. - * slot[i] contains the slot at offset i * BYTES_PER_SLOT, - * or null if that slot is not allocated. - */ - private final Slot slots[]; - - /** - * A bitset where each bit represents a slot which is in use. - */ - private final BitSet allocatedSlots; - - /** - * Create the ShortCircuitShm. - * - * @param shmId The ID to use. - * @param stream The stream that we're going to use to create this - * shared memory segment. - * - * Although this is a FileInputStream, we are going to - * assume that the underlying file descriptor is writable - * as well as readable. It would be more appropriate to use - * a RandomAccessFile here, but that class does not have - * any public accessor which returns a FileDescriptor, - * unlike FileInputStream. - */ - public ShortCircuitShm(ShmId shmId, FileInputStream stream) - throws IOException { - if (!NativeIO.isAvailable()) { - throw new UnsupportedOperationException("NativeIO is not available."); - } - if (Shell.WINDOWS) { - throw new UnsupportedOperationException( - "DfsClientShm is not yet implemented for Windows."); - } - if (unsafe == null) { - throw new UnsupportedOperationException( - "can't use DfsClientShm because we failed to " + - "load misc.Unsafe."); - } - this.shmId = shmId; - this.mmappedLength = getUsableLength(stream); - this.baseAddress = POSIX.mmap(stream.getFD(), - POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength); - this.slots = new Slot[mmappedLength / BYTES_PER_SLOT]; - this.allocatedSlots = new BitSet(slots.length); - if (LOG.isTraceEnabled()) { - LOG.trace("creating " + this.getClass().getSimpleName() + - "(shmId=" + shmId + - ", mmappedLength=" + mmappedLength + - ", baseAddress=" + String.format("%x", baseAddress) + - ", slots.length=" + slots.length + ")"); - } - } - - public final ShmId getShmId() { - return shmId; - } - - /** - * Determine if this shared memory object is empty. - * - * @return True if the shared memory object is empty. - */ - synchronized final public boolean isEmpty() { - return allocatedSlots.nextSetBit(0) == -1; - } - - /** - * Determine if this shared memory object is full. - * - * @return True if the shared memory object is full. - */ - synchronized final public boolean isFull() { - return allocatedSlots.nextClearBit(0) >= slots.length; - } - - /** - * Calculate the base address of a slot. - * - * @param slotIdx Index of the slot. - * @return The base address of the slot. - */ - private final long calculateSlotAddress(int slotIdx) { - long offset = slotIdx; - offset *= BYTES_PER_SLOT; - return this.baseAddress + offset; - } - - /** - * Allocate a new slot and register it. - * - * This function chooses an empty slot, initializes it, and then returns - * the relevant Slot object. - * - * @return The new slot. - */ - synchronized public final Slot allocAndRegisterSlot( - ExtendedBlockId blockId) { - int idx = allocatedSlots.nextClearBit(0); - if (idx >= slots.length) { - throw new RuntimeException(this + ": no more slots are available."); - } - allocatedSlots.set(idx, true); - Slot slot = new Slot(calculateSlotAddress(idx), blockId); - slot.clear(); - slot.makeValid(); - slots[idx] = slot; - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots + - StringUtils.getStackTrace(Thread.currentThread())); - } - return slot; - } - - synchronized public final Slot getSlot(int slotIdx) - throws InvalidRequestException { - if (!allocatedSlots.get(slotIdx)) { - throw new InvalidRequestException(this + ": slot " + slotIdx + - " does not exist."); - } - return slots[slotIdx]; - } - - /** - * Register a slot. - * - * This function looks at a slot which has already been initialized (by - * another process), and registers it with us. Then, it returns the - * relevant Slot object. - * - * @return The slot. - * - * @throws InvalidRequestException - * If the slot index we're trying to allocate has not been - * initialized, or is already in use. - */ - synchronized public final Slot registerSlot(int slotIdx, - ExtendedBlockId blockId) throws InvalidRequestException { - if (slotIdx < 0) { - throw new InvalidRequestException(this + ": invalid negative slot " + - "index " + slotIdx); - } - if (slotIdx >= slots.length) { - throw new InvalidRequestException(this + ": invalid slot " + - "index " + slotIdx); - } - if (allocatedSlots.get(slotIdx)) { - throw new InvalidRequestException(this + ": slot " + slotIdx + - " is already in use."); - } - Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId); - if (!slot.isValid()) { - throw new InvalidRequestException(this + ": slot " + slotIdx + - " is not marked as valid."); - } - slots[slotIdx] = slot; - allocatedSlots.set(slotIdx, true); - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots + - StringUtils.getStackTrace(Thread.currentThread())); - } - return slot; - } - - /** - * Unregisters a slot. - * - * This doesn't alter the contents of the slot. It just means - * - * @param slotIdx Index of the slot to unregister. - */ - synchronized public final void unregisterSlot(int slotIdx) { - Preconditions.checkState(allocatedSlots.get(slotIdx), - "tried to unregister slot " + slotIdx + ", which was not registered."); - allocatedSlots.set(slotIdx, false); - slots[slotIdx] = null; - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": unregisterSlot " + slotIdx); - } - } - - /** - * Iterate over all allocated slots. - * - * Note that this method isn't safe if - * - * @return The slot iterator. - */ - public SlotIterator slotIterator() { - return new SlotIterator(); - } - - public void free() { - try { - POSIX.munmap(baseAddress, mmappedLength); - } catch (IOException e) { - LOG.warn(this + ": failed to munmap", e); - } - LOG.trace(this + ": freed"); - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "(" + shmId + ")"; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java deleted file mode 100644 index 17365fb..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.util; - -import java.io.EOFException; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -import com.google.common.base.Preconditions; - -/** - * An InputStream implementations which reads from some other InputStream - * but expects an exact number of bytes. Any attempts to read past the - * specified number of bytes will return as if the end of the stream - * was reached. If the end of the underlying stream is reached prior to - * the specified number of bytes, an EOFException is thrown. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class ExactSizeInputStream extends FilterInputStream { - private int remaining; - - /** - * Construct an input stream that will read no more than - * 'numBytes' bytes. - * - * If an EOF occurs on the underlying stream before numBytes - * bytes have been read, an EOFException will be thrown. - * - * @param in the inputstream to wrap - * @param numBytes the number of bytes to read - */ - public ExactSizeInputStream(InputStream in, int numBytes) { - super(in); - Preconditions.checkArgument(numBytes >= 0, - "Negative expected bytes: ", numBytes); - this.remaining = numBytes; - } - - @Override - public int available() throws IOException { - return Math.min(super.available(), remaining); - } - - @Override - public int read() throws IOException { - // EOF if we reached our limit - if (remaining <= 0) { - return -1; - } - final int result = super.read(); - if (result >= 0) { - --remaining; - } else if (remaining > 0) { - // Underlying stream reached EOF but we haven't read the expected - // number of bytes. - throw new EOFException( - "Premature EOF. Expected " + remaining + "more bytes"); - } - return result; - } - - @Override - public int read(final byte[] b, final int off, int len) - throws IOException { - if (remaining <= 0) { - return -1; - } - len = Math.min(len, remaining); - final int result = super.read(b, off, len); - if (result >= 0) { - remaining -= result; - } else if (remaining > 0) { - // Underlying stream reached EOF but we haven't read the expected - // number of bytes. - throw new EOFException( - "Premature EOF. Expected " + remaining + "more bytes"); - } - return result; - } - - @Override - public long skip(final long n) throws IOException { - final long result = super.skip(Math.min(n, remaining)); - if (result > 0) { - remaining -= result; - } else if (remaining > 0) { - // Underlying stream reached EOF but we haven't read the expected - // number of bytes. - throw new EOFException( - "Premature EOF. Expected " + remaining + "more bytes"); - } - return result; - } - - @Override - public boolean markSupported() { - return false; - } - - @Override - public void mark(int readlimit) { - throw new UnsupportedOperationException(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a52367b..9c53874 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -482,8 +482,6 @@ Release 2.8.0 - UNRELEASED HDFS-8823. Move replication factor into individual blocks. (wheat9) - HDFS-8934. Move ShortCircuitShm to hdfs-client. (Mingliang Liu via wheat9) - OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index fec6b85..8517173 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -592,7 +592,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { failureInjector.getSupportsReceiptVerification()); DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); DomainSocket sock = peer.getDomainSocket(); failureInjector.injectRequestFileDescriptorsFailure(); switch (resp.getStatus()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 12646b5..a7b518e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -149,7 +149,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; @@ -1928,7 +1928,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, new Sender(out).blockChecksum(block, lb.getBlockToken()); final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); String logInfo = "for block " + block + " from datanode " + datanodes[j]; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); @@ -1960,7 +1960,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, // read crc-type final DataChecksum.Type ct; if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData + ct = PBHelper.convert(checksumData .getCrcType()); } else { LOG.debug("Retrieving checksum from an earlier-version DataNode: " + @@ -2088,11 +2088,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); - return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); + return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index a975312..8dd85b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -1245,7 +1245,7 @@ class DataStreamer extends Daemon { //ack BlockOpResponseProto response = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { throw new IOException("Failed to add a datanode"); } @@ -1524,7 +1524,7 @@ class DataStreamer extends Daemon { // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(blockReplyStream)); + PBHelper.vintPrefixed(blockReplyStream)); pipelineStatus = resp.getStatus(); firstBadLink = resp.getFirstBadLink(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java new file mode 100644 index 0000000..7b9e8e3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + +/** + * An immutable key which identifies a block. + */ +@InterfaceAudience.Private +final public class ExtendedBlockId { + /** + * The block ID for this block. + */ + private final long blockId; + + /** + * The block pool ID for this block. + */ + private final String bpId; + + public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) { + return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); + } + + public ExtendedBlockId(long blockId, String bpId) { + this.blockId = blockId; + this.bpId = bpId; + } + + public long getBlockId() { + return this.blockId; + } + + public String getBlockPoolId() { + return this.bpId; + } + + @Override + public boolean equals(Object o) { + if ((o == null) || (o.getClass() != this.getClass())) { + return false; + } + ExtendedBlockId other = (ExtendedBlockId)o; + return new EqualsBuilder(). + append(blockId, other.blockId). + append(bpId, other.bpId). + isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(this.blockId). + append(this.bpId). + toHashCode(); + } + + @Override + public String toString() { + return new StringBuilder().append(blockId). + append("_").append(bpId).toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 05a9f2c..d70f419 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; @@ -414,7 +414,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { new BufferedInputStream(peer.getInputStream(), bufferSize)); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 4c23d36..c368d65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; @@ -417,7 +417,7 @@ public class RemoteBlockReader2 implements BlockReader { DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java new file mode 100644 index 0000000..4792b0e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ReadableByteChannel; + +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Represents a peer that we communicate with by using blocking I/O + * on a UNIX domain socket. + */ +@InterfaceAudience.Private +public class DomainPeer implements Peer { + private final DomainSocket socket; + private final OutputStream out; + private final InputStream in; + private final ReadableByteChannel channel; + + public DomainPeer(DomainSocket socket) { + this.socket = socket; + this.out = socket.getOutputStream(); + this.in = socket.getInputStream(); + this.channel = socket.getChannel(); + } + + @Override + public ReadableByteChannel getInputStreamChannel() { + return channel; + } + + @Override + public void setReadTimeout(int timeoutMs) throws IOException { + socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs); + } + + @Override + public int getReceiveBufferSize() throws IOException { + return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); + } + + @Override + public boolean getTcpNoDelay() throws IOException { + /* No TCP, no TCP_NODELAY. */ + return false; + } + + @Override + public void setWriteTimeout(int timeoutMs) throws IOException { + socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs); + } + + @Override + public boolean isClosed() { + return !socket.isOpen(); + } + + @Override + public void close() throws IOException { + socket.close(); + } + + @Override + public String getRemoteAddressString() { + return "unix:" + socket.getPath(); + } + + @Override + public String getLocalAddressString() { + return "<local>"; + } + + @Override + public InputStream getInputStream() throws IOException { + return in; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public boolean isLocal() { + /* UNIX domain sockets can only be used for local communication. */ + return true; + } + + @Override + public String toString() { + return "DomainPeer(" + getRemoteAddressString() + ")"; + } + + @Override + public DomainSocket getDomainSocket() { + return socket; + } + + @Override + public boolean hasSecureChannel() { + // + // Communication over domain sockets is assumed to be secure, since it + // doesn't pass over any network. We also carefully control the privileges + // that can be used on the domain socket inode and its parent directories. + // See #{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0} + // for details. + // + // So unless you are running as root or the hdfs superuser, you cannot + // launch a man-in-the-middle attach on UNIX domain socket traffic. + // + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java new file mode 100644 index 0000000..42cf287 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ReadableByteChannel; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.net.unix.DomainSocket; + +/** + * Represents a connection to a peer. + */ +@InterfaceAudience.Private +public interface Peer extends Closeable { + /** + * @return The input stream channel associated with this + * peer, or null if it has none. + */ + public ReadableByteChannel getInputStreamChannel(); + + /** + * Set the read timeout on this peer. + * + * @param timeoutMs The timeout in milliseconds. + */ + public void setReadTimeout(int timeoutMs) throws IOException; + + /** + * @return The receive buffer size. + */ + public int getReceiveBufferSize() throws IOException; + + /** + * @return True if TCP_NODELAY is turned on. + */ + public boolean getTcpNoDelay() throws IOException; + + /** + * Set the write timeout on this peer. + * + * Note: this is not honored for BasicInetPeer. + * See {@link BasicSocketPeer#setWriteTimeout} for details. + * + * @param timeoutMs The timeout in milliseconds. + */ + public void setWriteTimeout(int timeoutMs) throws IOException; + + /** + * @return true only if the peer is closed. + */ + public boolean isClosed(); + + /** + * Close the peer. + * + * It's safe to re-close a Peer that is already closed. + */ + public void close() throws IOException; + + /** + * @return A string representing the remote end of our + * connection to the peer. + */ + public String getRemoteAddressString(); + + /** + * @return A string representing the local end of our + * connection to the peer. + */ + public String getLocalAddressString(); + + /** + * @return An InputStream associated with the Peer. + * This InputStream will be valid until you close + * this peer with Peer#close. + */ + public InputStream getInputStream() throws IOException; + + /** + * @return An OutputStream associated with the Peer. + * This OutputStream will be valid until you close + * this peer with Peer#close. + */ + public OutputStream getOutputStream() throws IOException; + + /** + * @return True if the peer resides on the same + * computer as we. + */ + public boolean isLocal(); + + /** + * @return The DomainSocket associated with the current + * peer, or null if there is none. + */ + public DomainSocket getDomainSocket(); + + /** + * Return true if the channel is secure. + * + * @return True if our channel to this peer is not + * susceptible to man-in-the-middle attacks. + */ + public boolean hasSecureChannel(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java new file mode 100644 index 0000000..5f86e52 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** Block Construction Stage */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public enum BlockConstructionStage { + /** The enumerates are always listed as regular stage followed by the + * recovery stage. + * Changing this order will make getRecoveryStage not working. + */ + // pipeline set up for block append + PIPELINE_SETUP_APPEND, + // pipeline set up for failed PIPELINE_SETUP_APPEND recovery + PIPELINE_SETUP_APPEND_RECOVERY, + // data streaming + DATA_STREAMING, + // pipeline setup for failed data streaming recovery + PIPELINE_SETUP_STREAMING_RECOVERY, + // close the block and pipeline + PIPELINE_CLOSE, + // Recover a failed PIPELINE_CLOSE + PIPELINE_CLOSE_RECOVERY, + // pipeline set up for block creation + PIPELINE_SETUP_CREATE, + // transfer RBW for adding datanodes + TRANSFER_RBW, + // transfer Finalized for adding datanodes + TRANSFER_FINALIZED; + + final static private byte RECOVERY_BIT = (byte)1; + + /** + * get the recovery stage of this stage + */ + public BlockConstructionStage getRecoveryStage() { + if (this == PIPELINE_SETUP_CREATE) { + throw new IllegalArgumentException( "Unexpected blockStage " + this); + } else { + return values()[ordinal()|RECOVERY_BIT]; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java new file mode 100644 index 0000000..284281a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceInfo; +import org.apache.htrace.TraceScope; + +/** + * Static utilities for dealing with the protocol buffers used by the + * Data Transfer Protocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class DataTransferProtoUtil { + static BlockConstructionStage fromProto( + OpWriteBlockProto.BlockConstructionStage stage) { + return BlockConstructionStage.valueOf(stage.name()); + } + + static OpWriteBlockProto.BlockConstructionStage toProto( + BlockConstructionStage stage) { + return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()); + } + + public static ChecksumProto toProto(DataChecksum checksum) { + ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType()); + // ChecksumType#valueOf never returns null + return ChecksumProto.newBuilder() + .setBytesPerChecksum(checksum.getBytesPerChecksum()) + .setType(type) + .build(); + } + + public static DataChecksum fromProto(ChecksumProto proto) { + if (proto == null) return null; + + int bytesPerChecksum = proto.getBytesPerChecksum(); + DataChecksum.Type type = PBHelper.convert(proto.getType()); + return DataChecksum.newDataChecksum(type, bytesPerChecksum); + } + + static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk, + String client, Token<BlockTokenIdentifier> blockToken) { + ClientOperationHeaderProto header = + ClientOperationHeaderProto.newBuilder() + .setBaseHeader(buildBaseHeader(blk, blockToken)) + .setClientName(client) + .build(); + return header; + } + + static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, + Token<BlockTokenIdentifier> blockToken) { + BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() + .setBlock(PBHelper.convert(blk)) + .setToken(PBHelper.convert(blockToken)); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()) + .setParentId(s.getSpanId())); + } + return builder.build(); + } + + public static TraceInfo fromProto(DataTransferTraceInfoProto proto) { + if (proto == null) return null; + if (!proto.hasTraceId()) return null; + return new TraceInfo(proto.getTraceId(), proto.getParentId()); + } + + public static TraceScope continueTraceSpan(ClientOperationHeaderProto header, + String description) { + return continueTraceSpan(header.getBaseHeader(), description); + } + + public static TraceScope continueTraceSpan(BaseHeaderProto header, + String description) { + return continueTraceSpan(header.getTraceInfo(), description); + } + + public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto, + String description) { + TraceScope scope = null; + TraceInfo info = fromProto(proto); + if (info != null) { + scope = Trace.startSpan(description, info); + } + return scope; + } + + public static void checkBlockOpStatus( + BlockOpResponseProto response, + String logInfo) throws IOException { + if (response.getStatus() != Status.SUCCESS) { + if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { + throw new InvalidBlockTokenException( + "Got access token error" + + ", status message " + response.getMessage() + + ", " + logInfo + ); + } else { + throw new IOException( + "Got error" + + ", status message " + response.getMessage() + + ", " + logInfo + ); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java new file mode 100644 index 0000000..48e931d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; + +/** + * Transfer data to/from datanode using a streaming protocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface DataTransferProtocol { + public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class); + + /** Version for data transfers between clients and datanodes + * This should change when serialization of DatanodeInfo, not just + * when protocol changes. It is not very obvious. + */ + /* + * Version 28: + * Declare methods in DataTransferProtocol interface. + */ + public static final int DATA_TRANSFER_VERSION = 28; + + /** + * Read a block. + * + * @param blk the block being read. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param blockOffset offset of the block. + * @param length maximum number of bytes for this read. + * @param sendChecksum if false, the DN should skip reading and sending + * checksums + * @param cachingStrategy The caching strategy to use. + */ + public void readBlock(final ExtendedBlock blk, + final Token<BlockTokenIdentifier> blockToken, + final String clientName, + final long blockOffset, + final long length, + final boolean sendChecksum, + final CachingStrategy cachingStrategy) throws IOException; + + /** + * Write a block to a datanode pipeline. + * The receiver datanode of this call is the next datanode in the pipeline. + * The other downstream datanodes are specified by the targets parameter. + * Note that the receiver {@link DatanodeInfo} is not required in the + * parameter list since the receiver datanode knows its info. However, the + * {@link StorageType} for storing the replica in the receiver datanode is a + * parameter since the receiver datanode may support multiple storage types. + * + * @param blk the block being written. + * @param storageType for storing the replica in the receiver datanode. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param targets other downstream datanodes in the pipeline. + * @param targetStorageTypes target {@link StorageType}s corresponding + * to the target datanodes. + * @param source source datanode. + * @param stage pipeline stage. + * @param pipelineSize the size of the pipeline. + * @param minBytesRcvd minimum number of bytes received. + * @param maxBytesRcvd maximum number of bytes received. + * @param latestGenerationStamp the latest generation stamp of the block. + * @param pinning whether to pin the block, so Balancer won't move it. + * @param targetPinnings whether to pin the block on target datanode + */ + public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, + final Token<BlockTokenIdentifier> blockToken, + final String clientName, + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, + final DatanodeInfo source, + final BlockConstructionStage stage, + final int pipelineSize, + final long minBytesRcvd, + final long maxBytesRcvd, + final long latestGenerationStamp, + final DataChecksum requestedChecksum, + final CachingStrategy cachingStrategy, + final boolean allowLazyPersist, + final boolean pinning, + final boolean[] targetPinnings) throws IOException; + /** + * Transfer a block to another datanode. + * The block stage must be + * either {@link BlockConstructionStage#TRANSFER_RBW} + * or {@link BlockConstructionStage#TRANSFER_FINALIZED}. + * + * @param blk the block being transferred. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param targets target datanodes. + */ + public void transferBlock(final ExtendedBlock blk, + final Token<BlockTokenIdentifier> blockToken, + final String clientName, + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException; + + /** + * Request short circuit access file descriptors from a DataNode. + * + * @param blk The block to get file descriptors for. + * @param blockToken Security token for accessing the block. + * @param slotId The shared memory slot id to use, or null + * to use no slot id. + * @param maxVersion Maximum version of the block data the client + * can understand. + * @param supportsReceiptVerification True if the client supports + * receipt verification. + */ + public void requestShortCircuitFds(final ExtendedBlock blk, + final Token<BlockTokenIdentifier> blockToken, + SlotId slotId, int maxVersion, boolean supportsReceiptVerification) + throws IOException; + + /** + * Release a pair of short-circuit FDs requested earlier. + * + * @param slotId SlotID used by the earlier file descriptors. + */ + public void releaseShortCircuitFds(final SlotId slotId) throws IOException; + + /** + * Request a short circuit shared memory area from a DataNode. + * + * @param clientName The name of the client. + */ + public void requestShortCircuitShm(String clientName) throws IOException; + + /** + * Receive a block from a source datanode + * and then notifies the namenode + * to remove the copy from the original datanode. + * Note that the source datanode and the original datanode can be different. + * It is used for balancing purpose. + * + * @param blk the block being replaced. + * @param storageType the {@link StorageType} for storing the block. + * @param blockToken security token for accessing the block. + * @param delHint the hint for deleting the block in the original datanode. + * @param source the source datanode for receiving the block. + */ + public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, + final Token<BlockTokenIdentifier> blockToken, + final String delHint, + final DatanodeInfo source) throws IOException; + + /** + * Copy a block. + * It is used for balancing purpose. + * + * @param blk the block being copied. + * @param blockToken security token for accessing the block. + */ + public void copyBlock(final ExtendedBlock blk, + final Token<BlockTokenIdentifier> blockToken) throws IOException; + + /** + * Get block checksum (MD5 of CRC32). + * + * @param blk a block. + * @param blockToken security token for accessing the block. + * @throws IOException + */ + public void blockChecksum(final ExtendedBlock blk, + final Token<BlockTokenIdentifier> blockToken) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java new file mode 100644 index 0000000..3077498 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** Operation */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public enum Op { + WRITE_BLOCK((byte)80), + READ_BLOCK((byte)81), + READ_METADATA((byte)82), + REPLACE_BLOCK((byte)83), + COPY_BLOCK((byte)84), + BLOCK_CHECKSUM((byte)85), + TRANSFER_BLOCK((byte)86), + REQUEST_SHORT_CIRCUIT_FDS((byte)87), + RELEASE_SHORT_CIRCUIT_FDS((byte)88), + REQUEST_SHORT_CIRCUIT_SHM((byte)89), + CUSTOM((byte)127); + + /** The code for this operation. */ + public final byte code; + + private Op(byte code) { + this.code = code; + } + + private static final int FIRST_CODE = values()[0].code; + /** Return the object represented by the code. */ + private static Op valueOf(byte code) { + final int i = (code & 0xff) - FIRST_CODE; + return i < 0 || i >= values().length? null: values()[i]; + } + + /** Read from in */ + public static Op read(DataInput in) throws IOException { + return valueOf(in.readByte()); + } + + /** Write to out */ + public void write(DataOutput out) throws IOException { + out.write(code); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java index 44f38c6..a811f39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer; -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.IOException; import java.io.InputStream; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 694f521..d435543 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.protocol.datatransfer; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan; -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.DataInputStream; import java.io.IOException; @@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.htrace.TraceScope; @@ -137,7 +136,7 @@ public abstract class Receiver implements DataTransferProtocol { proto.getClass().getSimpleName()); try { writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), - PBHelperClient.convertStorageType(proto.getStorageType()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), targets, @@ -229,7 +228,7 @@ public abstract class Receiver implements DataTransferProtocol { proto.getClass().getSimpleName()); try { replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), - PBHelperClient.convertStorageType(proto.getStorageType()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getToken()), proto.getDelHint(), PBHelper.convert(proto.getSource())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java new file mode 100644 index 0000000..df69125 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; + +import org.apache.htrace.Trace; +import org.apache.htrace.Span; + +import com.google.protobuf.Message; + +/** Sender */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class Sender implements DataTransferProtocol { + private final DataOutputStream out; + + /** Create a sender for DataTransferProtocol with a output stream. */ + public Sender(final DataOutputStream out) { + this.out = out; + } + + /** Initialize a operation. */ + private static void op(final DataOutput out, final Op op + ) throws IOException { + out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); + op.write(out); + } + + private static void send(final DataOutputStream out, final Op opcode, + final Message proto) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName() + + ": " + proto); + } + op(out, opcode); + proto.writeDelimitedTo(out); + out.flush(); + } + + static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) { + CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder(); + if (cachingStrategy.getReadahead() != null) { + builder.setReadahead(cachingStrategy.getReadahead().longValue()); + } + if (cachingStrategy.getDropBehind() != null) { + builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue()); + } + return builder.build(); + } + + @Override + public void readBlock(final ExtendedBlock blk, + final Token<BlockTokenIdentifier> blockToken, + final String clientName, + final long blockOffset, + final long length, + final boolean sendChecksum, + final CachingStrategy cachingStrategy) throws IOException { + + OpReadBlockProto proto = OpReadBlockProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) + .setOffset(blockOffset) + .setLen(length) + .setSendChecksums(sendChecksum) + .setCachingStrategy(getCachingStrategy(cachingStrategy)) + .build(); + + send(out, Op.READ_BLOCK, proto); + } + + + @Override + public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, + final Token<BlockTokenIdentifier> blockToken, + final String clientName, + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, + final DatanodeInfo source, + final BlockConstructionStage stage, + final int pipelineSize, + final long minBytesRcvd, + final long maxBytesRcvd, + final long latestGenerationStamp, + DataChecksum requestedChecksum, + final CachingStrategy cachingStrategy, + final boolean allowLazyPersist, + final boolean pinning, + final boolean[] targetPinnings) throws IOException { + ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( + blk, clientName, blockToken); + + ChecksumProto checksumProto = + DataTransferProtoUtil.toProto(requestedChecksum); + + OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() + .setHeader(header) + .setStorageType(PBHelper.convertStorageType(storageType)) + .addAllTargets(PBHelper.convert(targets, 1)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1)) + .setStage(toProto(stage)) + .setPipelineSize(pipelineSize) + .setMinBytesRcvd(minBytesRcvd) + .setMaxBytesRcvd(maxBytesRcvd) + .setLatestGenerationStamp(latestGenerationStamp) + .setRequestedChecksum(checksumProto) + .setCachingStrategy(getCachingStrategy(cachingStrategy)) + .setAllowLazyPersist(allowLazyPersist) + .setPinning(pinning) + .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1)); + + if (source != null) { + proto.setSource(PBHelper.convertDatanodeInfo(source)); + } + + send(out, Op.WRITE_BLOCK, proto.build()); + } + + @Override + public void transferBlock(final ExtendedBlock blk, + final Token<BlockTokenIdentifier> blockToken, + final String clientName, + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { + + OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildClientHeader( + blk, clientName, blockToken)) + .addAllTargets(PBHelper.convert(targets)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) + .build(); + + send(out, Op.TRANSFER_BLOCK, proto); + } + + @Override + public void requestShortCircuitFds(final ExtendedBlock blk, + final Token<BlockTokenIdentifier> blockToken, + SlotId slotId, int maxVersion, boolean supportsReceiptVerification) + throws IOException { + OpRequestShortCircuitAccessProto.Builder builder = + OpRequestShortCircuitAccessProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader( + blk, blockToken)).setMaxVersion(maxVersion); + if (slotId != null) { + builder.setSlotId(PBHelper.convert(slotId)); + } + builder.setSupportsReceiptVerification(supportsReceiptVerification); + OpRequestShortCircuitAccessProto proto = builder.build(); + send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); + } + + @Override + public void releaseShortCircuitFds(SlotId slotId) throws IOException { + ReleaseShortCircuitAccessRequestProto.Builder builder = + ReleaseShortCircuitAccessRequestProto.newBuilder(). + setSlotId(PBHelper.convert(slotId)); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); + } + ReleaseShortCircuitAccessRequestProto proto = builder.build(); + send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto); + } + + @Override + public void requestShortCircuitShm(String clientName) throws IOException { + ShortCircuitShmRequestProto.Builder builder = + ShortCircuitShmRequestProto.newBuilder(). + setClientName(clientName); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); + } + ShortCircuitShmRequestProto proto = builder.build(); + send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); + } + + @Override + public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, + final Token<BlockTokenIdentifier> blockToken, + final String delHint, + final DatanodeInfo source) throws IOException { + OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .setStorageType(PBHelper.convertStorageType(storageType)) + .setDelHint(delHint) + .setSource(PBHelper.convertDatanodeInfo(source)) + .build(); + + send(out, Op.REPLACE_BLOCK, proto); + } + + @Override + public void copyBlock(final ExtendedBlock blk, + final Token<BlockTokenIdentifier> blockToken) throws IOException { + OpCopyBlockProto proto = OpCopyBlockProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .build(); + + send(out, Op.COPY_BLOCK, proto); + } + + @Override + public void blockChecksum(final ExtendedBlock blk, + final Token<BlockTokenIdentifier> blockToken) throws IOException { + OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .build(); + + send(out, Op.BLOCK_CHECKSUM, proto); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java index 852819f..398d44c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java @@ -24,7 +24,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.IOException; import java.io.InputStream; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index a628287..2bc6a18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -137,7 +137,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } return GetBlockLocalPathInfoResponseProto.newBuilder() - .setBlock(PBHelperClient.convert(resp.getBlock())) + .setBlock(PBHelper.convert(resp.getBlock())) .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath()) .build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 53ca147..9d6375b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -185,7 +185,7 @@ public class ClientDatanodeProtocolTranslatorPB implements @Override public long getReplicaVisibleLength(ExtendedBlock b) throws IOException { GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto - .newBuilder().setBlock(PBHelperClient.convert(b)).build(); + .newBuilder().setBlock(PBHelper.convert(b)).build(); try { return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength(); } catch (ServiceException e) { @@ -218,8 +218,8 @@ public class ClientDatanodeProtocolTranslatorPB implements Token<BlockTokenIdentifier> token) throws IOException { GetBlockLocalPathInfoRequestProto req = GetBlockLocalPathInfoRequestProto.newBuilder() - .setBlock(PBHelperClient.convert(block)) - .setToken(PBHelperClient.convert(token)).build(); + .setBlock(PBHelper.convert(block)) + .setToken(PBHelper.convert(token)).build(); GetBlockLocalPathInfoResponseProto resp; try { resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);