http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java new file mode 100644 index 0000000..78325a3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java @@ -0,0 +1,647 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.BitSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Random; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sun.misc.Unsafe; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ComparisonChain; +import com.google.common.primitives.Ints; + +/** + * A shared memory segment used to implement short-circuit reads. + */ +public class ShortCircuitShm { + private static final Logger LOG = LoggerFactory.getLogger(ShortCircuitShm.class); + + protected static final int BYTES_PER_SLOT = 64; + + private static final Unsafe unsafe = safetyDance(); + + private static Unsafe safetyDance() { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe)f.get(null); + } catch (Throwable e) { + LOG.error("failed to load misc.Unsafe", e); + } + return null; + } + + /** + * Calculate the usable size of a shared memory segment. + * We round down to a multiple of the slot size and do some validation. + * + * @param stream The stream we're using. + * @return The usable size of the shared memory segment. + */ + private static int getUsableLength(FileInputStream stream) + throws IOException { + int intSize = Ints.checkedCast(stream.getChannel().size()); + int slots = intSize / BYTES_PER_SLOT; + if (slots == 0) { + throw new IOException("size of shared memory segment was " + + intSize + ", but that is not enough to hold even one slot."); + } + return slots * BYTES_PER_SLOT; + } + + /** + * Identifies a DfsClientShm. + */ + public static class ShmId implements Comparable<ShmId> { + private static final Random random = new Random(); + private final long hi; + private final long lo; + + /** + * Generate a random ShmId. + * + * We generate ShmIds randomly to prevent a malicious client from + * successfully guessing one and using that to interfere with another + * client. + */ + public static ShmId createRandom() { + return new ShmId(random.nextLong(), random.nextLong()); + } + + public ShmId(long hi, long lo) { + this.hi = hi; + this.lo = lo; + } + + public long getHi() { + return hi; + } + + public long getLo() { + return lo; + } + + @Override + public boolean equals(Object o) { + if ((o == null) || (o.getClass() != this.getClass())) { + return false; + } + ShmId other = (ShmId)o; + return new EqualsBuilder(). + append(hi, other.hi). + append(lo, other.lo). + isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(this.hi). + append(this.lo). + toHashCode(); + } + + @Override + public String toString() { + return String.format("%016x%016x", hi, lo); + } + + @Override + public int compareTo(ShmId other) { + return ComparisonChain.start(). + compare(hi, other.hi). + compare(lo, other.lo). + result(); + } + }; + + /** + * Uniquely identifies a slot. + */ + public static class SlotId { + private final ShmId shmId; + private final int slotIdx; + + public SlotId(ShmId shmId, int slotIdx) { + this.shmId = shmId; + this.slotIdx = slotIdx; + } + + public ShmId getShmId() { + return shmId; + } + + public int getSlotIdx() { + return slotIdx; + } + + @Override + public boolean equals(Object o) { + if ((o == null) || (o.getClass() != this.getClass())) { + return false; + } + SlotId other = (SlotId)o; + return new EqualsBuilder(). + append(shmId, other.shmId). + append(slotIdx, other.slotIdx). + isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(this.shmId). + append(this.slotIdx). + toHashCode(); + } + + @Override + public String toString() { + return String.format("SlotId(%s:%d)", shmId.toString(), slotIdx); + } + } + + public class SlotIterator implements Iterator<Slot> { + int slotIdx = -1; + + @Override + public boolean hasNext() { + synchronized (ShortCircuitShm.this) { + return allocatedSlots.nextSetBit(slotIdx + 1) != -1; + } + } + + @Override + public Slot next() { + synchronized (ShortCircuitShm.this) { + int nextSlotIdx = allocatedSlots.nextSetBit(slotIdx + 1); + if (nextSlotIdx == -1) { + throw new NoSuchElementException(); + } + slotIdx = nextSlotIdx; + return slots[nextSlotIdx]; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("SlotIterator " + + "doesn't support removal"); + } + } + + /** + * A slot containing information about a replica. + * + * The format is: + * word 0 + * bit 0:32 Slot flags (see below). + * bit 33:63 Anchor count. + * word 1:7 + * Reserved for future use, such as statistics. + * Padding is also useful for avoiding false sharing. + * + * Little-endian versus big-endian is not relevant here since both the client + * and the server reside on the same computer and use the same orientation. + */ + public class Slot { + /** + * Flag indicating that the slot is valid. + * + * The DFSClient sets this flag when it allocates a new slot within one of + * its shared memory regions. + * + * The DataNode clears this flag when the replica associated with this slot + * is no longer valid. The client itself also clears this flag when it + * believes that the DataNode is no longer using this slot to communicate. + */ + private static final long VALID_FLAG = 1L<<63; + + /** + * Flag indicating that the slot can be anchored. + */ + private static final long ANCHORABLE_FLAG = 1L<<62; + + /** + * The slot address in memory. + */ + private final long slotAddress; + + /** + * BlockId of the block this slot is used for. + */ + private final ExtendedBlockId blockId; + + Slot(long slotAddress, ExtendedBlockId blockId) { + this.slotAddress = slotAddress; + this.blockId = blockId; + } + + /** + * Get the short-circuit memory segment associated with this Slot. + * + * @return The enclosing short-circuit memory segment. + */ + public ShortCircuitShm getShm() { + return ShortCircuitShm.this; + } + + /** + * Get the ExtendedBlockId associated with this slot. + * + * @return The ExtendedBlockId of this slot. + */ + public ExtendedBlockId getBlockId() { + return blockId; + } + + /** + * Get the SlotId of this slot, containing both shmId and slotIdx. + * + * @return The SlotId of this slot. + */ + public SlotId getSlotId() { + return new SlotId(getShmId(), getSlotIdx()); + } + + /** + * Get the Slot index. + * + * @return The index of this slot. + */ + public int getSlotIdx() { + return Ints.checkedCast( + (slotAddress - baseAddress) / BYTES_PER_SLOT); + } + + /** + * Clear the slot. + */ + void clear() { + unsafe.putLongVolatile(null, this.slotAddress, 0); + } + + private boolean isSet(long flag) { + long prev = unsafe.getLongVolatile(null, this.slotAddress); + return (prev & flag) != 0; + } + + private void setFlag(long flag) { + long prev; + do { + prev = unsafe.getLongVolatile(null, this.slotAddress); + if ((prev & flag) != 0) { + return; + } + } while (!unsafe.compareAndSwapLong(null, this.slotAddress, + prev, prev | flag)); + } + + private void clearFlag(long flag) { + long prev; + do { + prev = unsafe.getLongVolatile(null, this.slotAddress); + if ((prev & flag) == 0) { + return; + } + } while (!unsafe.compareAndSwapLong(null, this.slotAddress, + prev, prev & (~flag))); + } + + public boolean isValid() { + return isSet(VALID_FLAG); + } + + public void makeValid() { + setFlag(VALID_FLAG); + } + + public void makeInvalid() { + clearFlag(VALID_FLAG); + } + + public boolean isAnchorable() { + return isSet(ANCHORABLE_FLAG); + } + + public void makeAnchorable() { + setFlag(ANCHORABLE_FLAG); + } + + public void makeUnanchorable() { + clearFlag(ANCHORABLE_FLAG); + } + + public boolean isAnchored() { + long prev = unsafe.getLongVolatile(null, this.slotAddress); + if ((prev & VALID_FLAG) == 0) { + // Slot is no longer valid. + return false; + } + return ((prev & 0x7fffffff) != 0); + } + + /** + * Try to add an anchor for a given slot. + * + * When a slot is anchored, we know that the block it refers to is resident + * in memory. + * + * @return True if the slot is anchored. + */ + public boolean addAnchor() { + long prev; + do { + prev = unsafe.getLongVolatile(null, this.slotAddress); + if ((prev & VALID_FLAG) == 0) { + // Slot is no longer valid. + return false; + } + if ((prev & ANCHORABLE_FLAG) == 0) { + // Slot can't be anchored right now. + return false; + } + if ((prev & 0x7fffffff) == 0x7fffffff) { + // Too many other threads have anchored the slot (2 billion?) + return false; + } + } while (!unsafe.compareAndSwapLong(null, this.slotAddress, + prev, prev + 1)); + return true; + } + + /** + * Remove an anchor for a given slot. + */ + public void removeAnchor() { + long prev; + do { + prev = unsafe.getLongVolatile(null, this.slotAddress); + Preconditions.checkState((prev & 0x7fffffff) != 0, + "Tried to remove anchor for slot " + slotAddress +", which was " + + "not anchored."); + } while (!unsafe.compareAndSwapLong(null, this.slotAddress, + prev, prev - 1)); + } + + @Override + public String toString() { + return "Slot(slotIdx=" + getSlotIdx() + ", shm=" + getShm() + ")"; + } + } + + /** + * ID for this SharedMemorySegment. + */ + private final ShmId shmId; + + /** + * The base address of the memory-mapped file. + */ + private final long baseAddress; + + /** + * The mmapped length of the shared memory segment + */ + private final int mmappedLength; + + /** + * The slots associated with this shared memory segment. + * slot[i] contains the slot at offset i * BYTES_PER_SLOT, + * or null if that slot is not allocated. + */ + private final Slot slots[]; + + /** + * A bitset where each bit represents a slot which is in use. + */ + private final BitSet allocatedSlots; + + /** + * Create the ShortCircuitShm. + * + * @param shmId The ID to use. + * @param stream The stream that we're going to use to create this + * shared memory segment. + * + * Although this is a FileInputStream, we are going to + * assume that the underlying file descriptor is writable + * as well as readable. It would be more appropriate to use + * a RandomAccessFile here, but that class does not have + * any public accessor which returns a FileDescriptor, + * unlike FileInputStream. + */ + public ShortCircuitShm(ShmId shmId, FileInputStream stream) + throws IOException { + if (!NativeIO.isAvailable()) { + throw new UnsupportedOperationException("NativeIO is not available."); + } + if (Shell.WINDOWS) { + throw new UnsupportedOperationException( + "DfsClientShm is not yet implemented for Windows."); + } + if (unsafe == null) { + throw new UnsupportedOperationException( + "can't use DfsClientShm because we failed to " + + "load misc.Unsafe."); + } + this.shmId = shmId; + this.mmappedLength = getUsableLength(stream); + this.baseAddress = POSIX.mmap(stream.getFD(), + POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength); + this.slots = new Slot[mmappedLength / BYTES_PER_SLOT]; + this.allocatedSlots = new BitSet(slots.length); + if (LOG.isTraceEnabled()) { + LOG.trace("creating " + this.getClass().getSimpleName() + + "(shmId=" + shmId + + ", mmappedLength=" + mmappedLength + + ", baseAddress=" + String.format("%x", baseAddress) + + ", slots.length=" + slots.length + ")"); + } + } + + public final ShmId getShmId() { + return shmId; + } + + /** + * Determine if this shared memory object is empty. + * + * @return True if the shared memory object is empty. + */ + synchronized final public boolean isEmpty() { + return allocatedSlots.nextSetBit(0) == -1; + } + + /** + * Determine if this shared memory object is full. + * + * @return True if the shared memory object is full. + */ + synchronized final public boolean isFull() { + return allocatedSlots.nextClearBit(0) >= slots.length; + } + + /** + * Calculate the base address of a slot. + * + * @param slotIdx Index of the slot. + * @return The base address of the slot. + */ + private final long calculateSlotAddress(int slotIdx) { + long offset = slotIdx; + offset *= BYTES_PER_SLOT; + return this.baseAddress + offset; + } + + /** + * Allocate a new slot and register it. + * + * This function chooses an empty slot, initializes it, and then returns + * the relevant Slot object. + * + * @return The new slot. + */ + synchronized public final Slot allocAndRegisterSlot( + ExtendedBlockId blockId) { + int idx = allocatedSlots.nextClearBit(0); + if (idx >= slots.length) { + throw new RuntimeException(this + ": no more slots are available."); + } + allocatedSlots.set(idx, true); + Slot slot = new Slot(calculateSlotAddress(idx), blockId); + slot.clear(); + slot.makeValid(); + slots[idx] = slot; + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots + + StringUtils.getStackTrace(Thread.currentThread())); + } + return slot; + } + + synchronized public final Slot getSlot(int slotIdx) + throws InvalidRequestException { + if (!allocatedSlots.get(slotIdx)) { + throw new InvalidRequestException(this + ": slot " + slotIdx + + " does not exist."); + } + return slots[slotIdx]; + } + + /** + * Register a slot. + * + * This function looks at a slot which has already been initialized (by + * another process), and registers it with us. Then, it returns the + * relevant Slot object. + * + * @return The slot. + * + * @throws InvalidRequestException + * If the slot index we're trying to allocate has not been + * initialized, or is already in use. + */ + synchronized public final Slot registerSlot(int slotIdx, + ExtendedBlockId blockId) throws InvalidRequestException { + if (slotIdx < 0) { + throw new InvalidRequestException(this + ": invalid negative slot " + + "index " + slotIdx); + } + if (slotIdx >= slots.length) { + throw new InvalidRequestException(this + ": invalid slot " + + "index " + slotIdx); + } + if (allocatedSlots.get(slotIdx)) { + throw new InvalidRequestException(this + ": slot " + slotIdx + + " is already in use."); + } + Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId); + if (!slot.isValid()) { + throw new InvalidRequestException(this + ": slot " + slotIdx + + " is not marked as valid."); + } + slots[slotIdx] = slot; + allocatedSlots.set(slotIdx, true); + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots + + StringUtils.getStackTrace(Thread.currentThread())); + } + return slot; + } + + /** + * Unregisters a slot. + * + * This doesn't alter the contents of the slot. It just means + * + * @param slotIdx Index of the slot to unregister. + */ + synchronized public final void unregisterSlot(int slotIdx) { + Preconditions.checkState(allocatedSlots.get(slotIdx), + "tried to unregister slot " + slotIdx + ", which was not registered."); + allocatedSlots.set(slotIdx, false); + slots[slotIdx] = null; + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": unregisterSlot " + slotIdx); + } + } + + /** + * Iterate over all allocated slots. + * + * Note that this method isn't safe if + * + * @return The slot iterator. + */ + public SlotIterator slotIterator() { + return new SlotIterator(); + } + + public void free() { + try { + POSIX.munmap(baseAddress, mmappedLength); + } catch (IOException e) { + LOG.warn(this + ": failed to munmap", e); + } + LOG.trace(this + ": freed"); + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "(" + shmId + ")"; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java new file mode 100644 index 0000000..17365fb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.io.EOFException; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; + +/** + * An InputStream implementations which reads from some other InputStream + * but expects an exact number of bytes. Any attempts to read past the + * specified number of bytes will return as if the end of the stream + * was reached. If the end of the underlying stream is reached prior to + * the specified number of bytes, an EOFException is thrown. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ExactSizeInputStream extends FilterInputStream { + private int remaining; + + /** + * Construct an input stream that will read no more than + * 'numBytes' bytes. + * + * If an EOF occurs on the underlying stream before numBytes + * bytes have been read, an EOFException will be thrown. + * + * @param in the inputstream to wrap + * @param numBytes the number of bytes to read + */ + public ExactSizeInputStream(InputStream in, int numBytes) { + super(in); + Preconditions.checkArgument(numBytes >= 0, + "Negative expected bytes: ", numBytes); + this.remaining = numBytes; + } + + @Override + public int available() throws IOException { + return Math.min(super.available(), remaining); + } + + @Override + public int read() throws IOException { + // EOF if we reached our limit + if (remaining <= 0) { + return -1; + } + final int result = super.read(); + if (result >= 0) { + --remaining; + } else if (remaining > 0) { + // Underlying stream reached EOF but we haven't read the expected + // number of bytes. + throw new EOFException( + "Premature EOF. Expected " + remaining + "more bytes"); + } + return result; + } + + @Override + public int read(final byte[] b, final int off, int len) + throws IOException { + if (remaining <= 0) { + return -1; + } + len = Math.min(len, remaining); + final int result = super.read(b, off, len); + if (result >= 0) { + remaining -= result; + } else if (remaining > 0) { + // Underlying stream reached EOF but we haven't read the expected + // number of bytes. + throw new EOFException( + "Premature EOF. Expected " + remaining + "more bytes"); + } + return result; + } + + @Override + public long skip(final long n) throws IOException { + final long result = super.skip(Math.min(n, remaining)); + if (result > 0) { + remaining -= result; + } else if (remaining > 0) { + // Underlying stream reached EOF but we haven't read the expected + // number of bytes. + throw new EOFException( + "Premature EOF. Expected " + remaining + "more bytes"); + } + return result; + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readlimit) { + throw new UnsupportedOperationException(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 08602d4..78f69fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -827,6 +827,8 @@ Release 2.8.0 - UNRELEASED HDFS-8823. Move replication factor into individual blocks. (wheat9) + HDFS-8934. Move ShortCircuitShm to hdfs-client. (Mingliang Liu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 8517173..fec6b85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -592,7 +592,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { failureInjector.getSupportsReceiptVerification()); DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - PBHelper.vintPrefixed(in)); + PBHelperClient.vintPrefixed(in)); DomainSocket sock = peer.getDomainSocket(); failureInjector.injectRequestFileDescriptorsFailure(); switch (resp.getStatus()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 99dbb19..47aaed6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -144,7 +144,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; @@ -1853,7 +1853,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, new Sender(out).blockChecksum(block, lb.getBlockToken()); final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); String logInfo = "for block " + block + " from datanode " + datanodes[j]; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); @@ -1885,7 +1885,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, // read crc-type final DataChecksum.Type ct; if (checksumData.hasCrcType()) { - ct = PBHelper.convert(checksumData + ct = PBHelperClient.convert(checksumData .getCrcType()); } else { LOG.debug("Retrieving checksum from an earlier-version DataNode: " + @@ -2013,11 +2013,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); - return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); + return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 8dd85b7..a975312 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -1245,7 +1245,7 @@ class DataStreamer extends Daemon { //ack BlockOpResponseProto response = - BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { throw new IOException("Failed to add a datanode"); } @@ -1524,7 +1524,7 @@ class DataStreamer extends Daemon { // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - PBHelper.vintPrefixed(blockReplyStream)); + PBHelperClient.vintPrefixed(blockReplyStream)); pipelineStatus = resp.getStatus(); firstBadLink = resp.getFirstBadLink(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java deleted file mode 100644 index 7b9e8e3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.commons.lang.builder.EqualsBuilder; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; - -/** - * An immutable key which identifies a block. - */ -@InterfaceAudience.Private -final public class ExtendedBlockId { - /** - * The block ID for this block. - */ - private final long blockId; - - /** - * The block pool ID for this block. - */ - private final String bpId; - - public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) { - return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); - } - - public ExtendedBlockId(long blockId, String bpId) { - this.blockId = blockId; - this.bpId = bpId; - } - - public long getBlockId() { - return this.blockId; - } - - public String getBlockPoolId() { - return this.bpId; - } - - @Override - public boolean equals(Object o) { - if ((o == null) || (o.getClass() != this.getClass())) { - return false; - } - ExtendedBlockId other = (ExtendedBlockId)o; - return new EqualsBuilder(). - append(blockId, other.blockId). - append(bpId, other.bpId). - isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(). - append(this.blockId). - append(this.bpId). - toHashCode(); - } - - @Override - public String toString() { - return new StringBuilder().append(blockId). - append("_").append(bpId).toString(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index d70f419..05a9f2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; @@ -414,7 +414,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { new BufferedInputStream(peer.getInputStream(), bufferSize)); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelper.vintPrefixed(in)); + PBHelperClient.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index c368d65..4c23d36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; @@ -417,7 +417,7 @@ public class RemoteBlockReader2 implements BlockReader { DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelper.vintPrefixed(in)); + PBHelperClient.vintPrefixed(in)); checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java deleted file mode 100644 index 4792b0e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.net; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.channels.ReadableByteChannel; - -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.classification.InterfaceAudience; - -/** - * Represents a peer that we communicate with by using blocking I/O - * on a UNIX domain socket. - */ -@InterfaceAudience.Private -public class DomainPeer implements Peer { - private final DomainSocket socket; - private final OutputStream out; - private final InputStream in; - private final ReadableByteChannel channel; - - public DomainPeer(DomainSocket socket) { - this.socket = socket; - this.out = socket.getOutputStream(); - this.in = socket.getInputStream(); - this.channel = socket.getChannel(); - } - - @Override - public ReadableByteChannel getInputStreamChannel() { - return channel; - } - - @Override - public void setReadTimeout(int timeoutMs) throws IOException { - socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs); - } - - @Override - public int getReceiveBufferSize() throws IOException { - return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); - } - - @Override - public boolean getTcpNoDelay() throws IOException { - /* No TCP, no TCP_NODELAY. */ - return false; - } - - @Override - public void setWriteTimeout(int timeoutMs) throws IOException { - socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs); - } - - @Override - public boolean isClosed() { - return !socket.isOpen(); - } - - @Override - public void close() throws IOException { - socket.close(); - } - - @Override - public String getRemoteAddressString() { - return "unix:" + socket.getPath(); - } - - @Override - public String getLocalAddressString() { - return "<local>"; - } - - @Override - public InputStream getInputStream() throws IOException { - return in; - } - - @Override - public OutputStream getOutputStream() throws IOException { - return out; - } - - @Override - public boolean isLocal() { - /* UNIX domain sockets can only be used for local communication. */ - return true; - } - - @Override - public String toString() { - return "DomainPeer(" + getRemoteAddressString() + ")"; - } - - @Override - public DomainSocket getDomainSocket() { - return socket; - } - - @Override - public boolean hasSecureChannel() { - // - // Communication over domain sockets is assumed to be secure, since it - // doesn't pass over any network. We also carefully control the privileges - // that can be used on the domain socket inode and its parent directories. - // See #{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0} - // for details. - // - // So unless you are running as root or the hdfs superuser, you cannot - // launch a man-in-the-middle attach on UNIX domain socket traffic. - // - return true; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java deleted file mode 100644 index 42cf287..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.net; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.channels.ReadableByteChannel; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.net.unix.DomainSocket; - -/** - * Represents a connection to a peer. - */ -@InterfaceAudience.Private -public interface Peer extends Closeable { - /** - * @return The input stream channel associated with this - * peer, or null if it has none. - */ - public ReadableByteChannel getInputStreamChannel(); - - /** - * Set the read timeout on this peer. - * - * @param timeoutMs The timeout in milliseconds. - */ - public void setReadTimeout(int timeoutMs) throws IOException; - - /** - * @return The receive buffer size. - */ - public int getReceiveBufferSize() throws IOException; - - /** - * @return True if TCP_NODELAY is turned on. - */ - public boolean getTcpNoDelay() throws IOException; - - /** - * Set the write timeout on this peer. - * - * Note: this is not honored for BasicInetPeer. - * See {@link BasicSocketPeer#setWriteTimeout} for details. - * - * @param timeoutMs The timeout in milliseconds. - */ - public void setWriteTimeout(int timeoutMs) throws IOException; - - /** - * @return true only if the peer is closed. - */ - public boolean isClosed(); - - /** - * Close the peer. - * - * It's safe to re-close a Peer that is already closed. - */ - public void close() throws IOException; - - /** - * @return A string representing the remote end of our - * connection to the peer. - */ - public String getRemoteAddressString(); - - /** - * @return A string representing the local end of our - * connection to the peer. - */ - public String getLocalAddressString(); - - /** - * @return An InputStream associated with the Peer. - * This InputStream will be valid until you close - * this peer with Peer#close. - */ - public InputStream getInputStream() throws IOException; - - /** - * @return An OutputStream associated with the Peer. - * This OutputStream will be valid until you close - * this peer with Peer#close. - */ - public OutputStream getOutputStream() throws IOException; - - /** - * @return True if the peer resides on the same - * computer as we. - */ - public boolean isLocal(); - - /** - * @return The DomainSocket associated with the current - * peer, or null if there is none. - */ - public DomainSocket getDomainSocket(); - - /** - * Return true if the channel is secure. - * - * @return True if our channel to this peer is not - * susceptible to man-in-the-middle attacks. - */ - public boolean hasSecureChannel(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java deleted file mode 100644 index 5f86e52..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** Block Construction Stage */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public enum BlockConstructionStage { - /** The enumerates are always listed as regular stage followed by the - * recovery stage. - * Changing this order will make getRecoveryStage not working. - */ - // pipeline set up for block append - PIPELINE_SETUP_APPEND, - // pipeline set up for failed PIPELINE_SETUP_APPEND recovery - PIPELINE_SETUP_APPEND_RECOVERY, - // data streaming - DATA_STREAMING, - // pipeline setup for failed data streaming recovery - PIPELINE_SETUP_STREAMING_RECOVERY, - // close the block and pipeline - PIPELINE_CLOSE, - // Recover a failed PIPELINE_CLOSE - PIPELINE_CLOSE_RECOVERY, - // pipeline set up for block creation - PIPELINE_SETUP_CREATE, - // transfer RBW for adding datanodes - TRANSFER_RBW, - // transfer Finalized for adding datanodes - TRANSFER_FINALIZED; - - final static private byte RECOVERY_BIT = (byte)1; - - /** - * get the recovery stage of this stage - */ - public BlockConstructionStage getRecoveryStage() { - if (this == PIPELINE_SETUP_CREATE) { - throw new IllegalArgumentException( "Unexpected blockStage " + this); - } else { - return values()[ordinal()|RECOVERY_BIT]; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java deleted file mode 100644 index 284281a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceInfo; -import org.apache.htrace.TraceScope; - -/** - * Static utilities for dealing with the protocol buffers used by the - * Data Transfer Protocol. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public abstract class DataTransferProtoUtil { - static BlockConstructionStage fromProto( - OpWriteBlockProto.BlockConstructionStage stage) { - return BlockConstructionStage.valueOf(stage.name()); - } - - static OpWriteBlockProto.BlockConstructionStage toProto( - BlockConstructionStage stage) { - return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()); - } - - public static ChecksumProto toProto(DataChecksum checksum) { - ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType()); - // ChecksumType#valueOf never returns null - return ChecksumProto.newBuilder() - .setBytesPerChecksum(checksum.getBytesPerChecksum()) - .setType(type) - .build(); - } - - public static DataChecksum fromProto(ChecksumProto proto) { - if (proto == null) return null; - - int bytesPerChecksum = proto.getBytesPerChecksum(); - DataChecksum.Type type = PBHelper.convert(proto.getType()); - return DataChecksum.newDataChecksum(type, bytesPerChecksum); - } - - static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk, - String client, Token<BlockTokenIdentifier> blockToken) { - ClientOperationHeaderProto header = - ClientOperationHeaderProto.newBuilder() - .setBaseHeader(buildBaseHeader(blk, blockToken)) - .setClientName(client) - .build(); - return header; - } - - static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, - Token<BlockTokenIdentifier> blockToken) { - BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() - .setBlock(PBHelper.convert(blk)) - .setToken(PBHelper.convert(blockToken)); - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() - .setTraceId(s.getTraceId()) - .setParentId(s.getSpanId())); - } - return builder.build(); - } - - public static TraceInfo fromProto(DataTransferTraceInfoProto proto) { - if (proto == null) return null; - if (!proto.hasTraceId()) return null; - return new TraceInfo(proto.getTraceId(), proto.getParentId()); - } - - public static TraceScope continueTraceSpan(ClientOperationHeaderProto header, - String description) { - return continueTraceSpan(header.getBaseHeader(), description); - } - - public static TraceScope continueTraceSpan(BaseHeaderProto header, - String description) { - return continueTraceSpan(header.getTraceInfo(), description); - } - - public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto, - String description) { - TraceScope scope = null; - TraceInfo info = fromProto(proto); - if (info != null) { - scope = Trace.startSpan(description, info); - } - return scope; - } - - public static void checkBlockOpStatus( - BlockOpResponseProto response, - String logInfo) throws IOException { - if (response.getStatus() != Status.SUCCESS) { - if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { - throw new InvalidBlockTokenException( - "Got access token error" - + ", status message " + response.getMessage() - + ", " + logInfo - ); - } else { - throw new IOException( - "Got error" - + ", status message " + response.getMessage() - + ", " + logInfo - ); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java deleted file mode 100644 index 48e931d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; - -/** - * Transfer data to/from datanode using a streaming protocol. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface DataTransferProtocol { - public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class); - - /** Version for data transfers between clients and datanodes - * This should change when serialization of DatanodeInfo, not just - * when protocol changes. It is not very obvious. - */ - /* - * Version 28: - * Declare methods in DataTransferProtocol interface. - */ - public static final int DATA_TRANSFER_VERSION = 28; - - /** - * Read a block. - * - * @param blk the block being read. - * @param blockToken security token for accessing the block. - * @param clientName client's name. - * @param blockOffset offset of the block. - * @param length maximum number of bytes for this read. - * @param sendChecksum if false, the DN should skip reading and sending - * checksums - * @param cachingStrategy The caching strategy to use. - */ - public void readBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final long blockOffset, - final long length, - final boolean sendChecksum, - final CachingStrategy cachingStrategy) throws IOException; - - /** - * Write a block to a datanode pipeline. - * The receiver datanode of this call is the next datanode in the pipeline. - * The other downstream datanodes are specified by the targets parameter. - * Note that the receiver {@link DatanodeInfo} is not required in the - * parameter list since the receiver datanode knows its info. However, the - * {@link StorageType} for storing the replica in the receiver datanode is a - * parameter since the receiver datanode may support multiple storage types. - * - * @param blk the block being written. - * @param storageType for storing the replica in the receiver datanode. - * @param blockToken security token for accessing the block. - * @param clientName client's name. - * @param targets other downstream datanodes in the pipeline. - * @param targetStorageTypes target {@link StorageType}s corresponding - * to the target datanodes. - * @param source source datanode. - * @param stage pipeline stage. - * @param pipelineSize the size of the pipeline. - * @param minBytesRcvd minimum number of bytes received. - * @param maxBytesRcvd maximum number of bytes received. - * @param latestGenerationStamp the latest generation stamp of the block. - * @param pinning whether to pin the block, so Balancer won't move it. - * @param targetPinnings whether to pin the block on target datanode - */ - public void writeBlock(final ExtendedBlock blk, - final StorageType storageType, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes, - final DatanodeInfo source, - final BlockConstructionStage stage, - final int pipelineSize, - final long minBytesRcvd, - final long maxBytesRcvd, - final long latestGenerationStamp, - final DataChecksum requestedChecksum, - final CachingStrategy cachingStrategy, - final boolean allowLazyPersist, - final boolean pinning, - final boolean[] targetPinnings) throws IOException; - /** - * Transfer a block to another datanode. - * The block stage must be - * either {@link BlockConstructionStage#TRANSFER_RBW} - * or {@link BlockConstructionStage#TRANSFER_FINALIZED}. - * - * @param blk the block being transferred. - * @param blockToken security token for accessing the block. - * @param clientName client's name. - * @param targets target datanodes. - */ - public void transferBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes) throws IOException; - - /** - * Request short circuit access file descriptors from a DataNode. - * - * @param blk The block to get file descriptors for. - * @param blockToken Security token for accessing the block. - * @param slotId The shared memory slot id to use, or null - * to use no slot id. - * @param maxVersion Maximum version of the block data the client - * can understand. - * @param supportsReceiptVerification True if the client supports - * receipt verification. - */ - public void requestShortCircuitFds(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - SlotId slotId, int maxVersion, boolean supportsReceiptVerification) - throws IOException; - - /** - * Release a pair of short-circuit FDs requested earlier. - * - * @param slotId SlotID used by the earlier file descriptors. - */ - public void releaseShortCircuitFds(final SlotId slotId) throws IOException; - - /** - * Request a short circuit shared memory area from a DataNode. - * - * @param clientName The name of the client. - */ - public void requestShortCircuitShm(String clientName) throws IOException; - - /** - * Receive a block from a source datanode - * and then notifies the namenode - * to remove the copy from the original datanode. - * Note that the source datanode and the original datanode can be different. - * It is used for balancing purpose. - * - * @param blk the block being replaced. - * @param storageType the {@link StorageType} for storing the block. - * @param blockToken security token for accessing the block. - * @param delHint the hint for deleting the block in the original datanode. - * @param source the source datanode for receiving the block. - */ - public void replaceBlock(final ExtendedBlock blk, - final StorageType storageType, - final Token<BlockTokenIdentifier> blockToken, - final String delHint, - final DatanodeInfo source) throws IOException; - - /** - * Copy a block. - * It is used for balancing purpose. - * - * @param blk the block being copied. - * @param blockToken security token for accessing the block. - */ - public void copyBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken) throws IOException; - - /** - * Get block checksum (MD5 of CRC32). - * - * @param blk a block. - * @param blockToken security token for accessing the block. - * @throws IOException - */ - public void blockChecksum(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java deleted file mode 100644 index 3077498..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** Operation */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public enum Op { - WRITE_BLOCK((byte)80), - READ_BLOCK((byte)81), - READ_METADATA((byte)82), - REPLACE_BLOCK((byte)83), - COPY_BLOCK((byte)84), - BLOCK_CHECKSUM((byte)85), - TRANSFER_BLOCK((byte)86), - REQUEST_SHORT_CIRCUIT_FDS((byte)87), - RELEASE_SHORT_CIRCUIT_FDS((byte)88), - REQUEST_SHORT_CIRCUIT_SHM((byte)89), - CUSTOM((byte)127); - - /** The code for this operation. */ - public final byte code; - - private Op(byte code) { - this.code = code; - } - - private static final int FIRST_CODE = values()[0].code; - /** Return the object represented by the code. */ - private static Op valueOf(byte code) { - final int i = (code & 0xff) - FIRST_CODE; - return i < 0 || i >= values().length? null: values()[i]; - } - - /** Read from in */ - public static Op read(DataInput in) throws IOException { - return valueOf(in.readByte()); - } - - /** Write to out */ - public void write(DataOutput out) throws IOException { - out.write(code); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java index a811f39..44f38c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer; -import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; import java.io.IOException; import java.io.InputStream; http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index d435543..694f521 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.protocol.datatransfer; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan; -import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; import java.io.DataInputStream; import java.io.IOException; @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.htrace.TraceScope; @@ -136,7 +137,7 @@ public abstract class Receiver implements DataTransferProtocol { proto.getClass().getSimpleName()); try { writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), - PBHelper.convertStorageType(proto.getStorageType()), + PBHelperClient.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), targets, @@ -228,7 +229,7 @@ public abstract class Receiver implements DataTransferProtocol { proto.getClass().getSimpleName()); try { replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), - PBHelper.convertStorageType(proto.getStorageType()), + PBHelperClient.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getToken()), proto.getDelHint(), PBHelper.convert(proto.getSource())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java deleted file mode 100644 index df69125..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto; - -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; - -import org.apache.htrace.Trace; -import org.apache.htrace.Span; - -import com.google.protobuf.Message; - -/** Sender */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class Sender implements DataTransferProtocol { - private final DataOutputStream out; - - /** Create a sender for DataTransferProtocol with a output stream. */ - public Sender(final DataOutputStream out) { - this.out = out; - } - - /** Initialize a operation. */ - private static void op(final DataOutput out, final Op op - ) throws IOException { - out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); - op.write(out); - } - - private static void send(final DataOutputStream out, final Op opcode, - final Message proto) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName() - + ": " + proto); - } - op(out, opcode); - proto.writeDelimitedTo(out); - out.flush(); - } - - static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) { - CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder(); - if (cachingStrategy.getReadahead() != null) { - builder.setReadahead(cachingStrategy.getReadahead().longValue()); - } - if (cachingStrategy.getDropBehind() != null) { - builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue()); - } - return builder.build(); - } - - @Override - public void readBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final long blockOffset, - final long length, - final boolean sendChecksum, - final CachingStrategy cachingStrategy) throws IOException { - - OpReadBlockProto proto = OpReadBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) - .setOffset(blockOffset) - .setLen(length) - .setSendChecksums(sendChecksum) - .setCachingStrategy(getCachingStrategy(cachingStrategy)) - .build(); - - send(out, Op.READ_BLOCK, proto); - } - - - @Override - public void writeBlock(final ExtendedBlock blk, - final StorageType storageType, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes, - final DatanodeInfo source, - final BlockConstructionStage stage, - final int pipelineSize, - final long minBytesRcvd, - final long maxBytesRcvd, - final long latestGenerationStamp, - DataChecksum requestedChecksum, - final CachingStrategy cachingStrategy, - final boolean allowLazyPersist, - final boolean pinning, - final boolean[] targetPinnings) throws IOException { - ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( - blk, clientName, blockToken); - - ChecksumProto checksumProto = - DataTransferProtoUtil.toProto(requestedChecksum); - - OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() - .setHeader(header) - .setStorageType(PBHelper.convertStorageType(storageType)) - .addAllTargets(PBHelper.convert(targets, 1)) - .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1)) - .setStage(toProto(stage)) - .setPipelineSize(pipelineSize) - .setMinBytesRcvd(minBytesRcvd) - .setMaxBytesRcvd(maxBytesRcvd) - .setLatestGenerationStamp(latestGenerationStamp) - .setRequestedChecksum(checksumProto) - .setCachingStrategy(getCachingStrategy(cachingStrategy)) - .setAllowLazyPersist(allowLazyPersist) - .setPinning(pinning) - .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1)); - - if (source != null) { - proto.setSource(PBHelper.convertDatanodeInfo(source)); - } - - send(out, Op.WRITE_BLOCK, proto.build()); - } - - @Override - public void transferBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - final String clientName, - final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes) throws IOException { - - OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildClientHeader( - blk, clientName, blockToken)) - .addAllTargets(PBHelper.convert(targets)) - .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) - .build(); - - send(out, Op.TRANSFER_BLOCK, proto); - } - - @Override - public void requestShortCircuitFds(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken, - SlotId slotId, int maxVersion, boolean supportsReceiptVerification) - throws IOException { - OpRequestShortCircuitAccessProto.Builder builder = - OpRequestShortCircuitAccessProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader( - blk, blockToken)).setMaxVersion(maxVersion); - if (slotId != null) { - builder.setSlotId(PBHelper.convert(slotId)); - } - builder.setSupportsReceiptVerification(supportsReceiptVerification); - OpRequestShortCircuitAccessProto proto = builder.build(); - send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); - } - - @Override - public void releaseShortCircuitFds(SlotId slotId) throws IOException { - ReleaseShortCircuitAccessRequestProto.Builder builder = - ReleaseShortCircuitAccessRequestProto.newBuilder(). - setSlotId(PBHelper.convert(slotId)); - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() - .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); - } - ReleaseShortCircuitAccessRequestProto proto = builder.build(); - send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto); - } - - @Override - public void requestShortCircuitShm(String clientName) throws IOException { - ShortCircuitShmRequestProto.Builder builder = - ShortCircuitShmRequestProto.newBuilder(). - setClientName(clientName); - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() - .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); - } - ShortCircuitShmRequestProto proto = builder.build(); - send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); - } - - @Override - public void replaceBlock(final ExtendedBlock blk, - final StorageType storageType, - final Token<BlockTokenIdentifier> blockToken, - final String delHint, - final DatanodeInfo source) throws IOException { - OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) - .setStorageType(PBHelper.convertStorageType(storageType)) - .setDelHint(delHint) - .setSource(PBHelper.convertDatanodeInfo(source)) - .build(); - - send(out, Op.REPLACE_BLOCK, proto); - } - - @Override - public void copyBlock(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken) throws IOException { - OpCopyBlockProto proto = OpCopyBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) - .build(); - - send(out, Op.COPY_BLOCK, proto); - } - - @Override - public void blockChecksum(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken) throws IOException { - OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) - .build(); - - send(out, Op.BLOCK_CHECKSUM, proto); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java index 398d44c..852819f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java @@ -24,7 +24,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; -import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; import java.io.IOException; import java.io.InputStream; http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index c62d9ba..3886007 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -128,7 +128,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } return GetBlockLocalPathInfoResponseProto.newBuilder() - .setBlock(PBHelper.convert(resp.getBlock())) + .setBlock(PBHelperClient.convert(resp.getBlock())) .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath()) .build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 2146063..7b427fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -178,7 +178,7 @@ public class ClientDatanodeProtocolTranslatorPB implements @Override public long getReplicaVisibleLength(ExtendedBlock b) throws IOException { GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto - .newBuilder().setBlock(PBHelper.convert(b)).build(); + .newBuilder().setBlock(PBHelperClient.convert(b)).build(); try { return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength(); } catch (ServiceException e) { @@ -211,8 +211,8 @@ public class ClientDatanodeProtocolTranslatorPB implements Token<BlockTokenIdentifier> token) throws IOException { GetBlockLocalPathInfoRequestProto req = GetBlockLocalPathInfoRequestProto.newBuilder() - .setBlock(PBHelper.convert(block)) - .setToken(PBHelper.convert(token)).build(); + .setBlock(PBHelperClient.convert(block)) + .setToken(PBHelperClient.convert(token)).build(); GetBlockLocalPathInfoResponseProto resp; try { resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);