http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java deleted file mode 100644 index c16ffdf..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ /dev/null @@ -1,735 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; -import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; -import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.DirectBufferPool; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; - -/** - * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on - * the same machine as the datanode, then the client can read files directly - * from the local file system rather than going through the datanode for better - * performance. <br> - * - * This is the legacy implementation based on HDFS-2246, which requires - * permissions on the datanode to be set so that clients can directly access the - * blocks. The new implementation based on HDFS-347 should be preferred on UNIX - * systems where the required native code has been implemented.<br> - * - * {@link BlockReaderLocalLegacy} works as follows: - * <ul> - * <li>The client performing short circuit reads must be configured at the - * datanode.</li> - * <li>The client gets the path to the file where block is stored using - * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)} - * RPC call</li> - * <li>Client uses kerberos authentication to connect to the datanode over RPC, - * if security is enabled.</li> - * </ul> - */ -@InterfaceAudience.Private -class BlockReaderLocalLegacy implements BlockReader { - private static final Log LOG = LogFactory.getLog(BlockReaderLocalLegacy.class); - - //Stores the cache and proxy for a local datanode. - private static class LocalDatanodeInfo { - private ClientDatanodeProtocol proxy = null; - private final Map<ExtendedBlock, BlockLocalPathInfo> cache; - - LocalDatanodeInfo() { - final int cacheSize = 10000; - final float hashTableLoadFactor = 0.75f; - int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1; - cache = Collections - .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>( - hashTableCapacity, hashTableLoadFactor, true) { - private static final long serialVersionUID = 1; - - @Override - protected boolean removeEldestEntry( - Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) { - return size() > cacheSize; - } - }); - } - - private synchronized ClientDatanodeProtocol getDatanodeProxy( - UserGroupInformation ugi, final DatanodeInfo node, - final Configuration conf, final int socketTimeout, - final boolean connectToDnViaHostname) throws IOException { - if (proxy == null) { - try { - proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() { - @Override - public ClientDatanodeProtocol run() throws Exception { - return DFSUtil.createClientDatanodeProtocolProxy(node, conf, - socketTimeout, connectToDnViaHostname); - } - }); - } catch (InterruptedException e) { - LOG.warn("encountered exception ", e); - } - } - return proxy; - } - - private synchronized void resetDatanodeProxy() { - if (null != proxy) { - RPC.stopProxy(proxy); - proxy = null; - } - } - - private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { - return cache.get(b); - } - - private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) { - cache.put(b, info); - } - - private void removeBlockLocalPathInfo(ExtendedBlock b) { - cache.remove(b); - } - } - - // Multiple datanodes could be running on the local machine. Store proxies in - // a map keyed by the ipc port of the datanode. - private static final Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>(); - - private final FileInputStream dataIn; // reader for the data file - private final FileInputStream checksumIn; // reader for the checksum file - - /** - * Offset from the most recent chunk boundary at which the next read should - * take place. Is only set to non-zero at construction time, and is - * decremented (usually to 0) by subsequent reads. This avoids having to do a - * checksum read at construction to position the read cursor correctly. - */ - private int offsetFromChunkBoundary; - - private byte[] skipBuf = null; - - /** - * Used for checksummed reads that need to be staged before copying to their - * output buffer because they are either a) smaller than the checksum chunk - * size or b) issued by the slower read(byte[]...) path - */ - private ByteBuffer slowReadBuff = null; - private ByteBuffer checksumBuff = null; - private DataChecksum checksum; - private final boolean verifyChecksum; - - private static final DirectBufferPool bufferPool = new DirectBufferPool(); - - private final int bytesPerChecksum; - private final int checksumSize; - - /** offset in block where reader wants to actually read */ - private long startOffset; - private final String filename; - private long blockId; - - /** - * The only way this object can be instantiated. - */ - static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf, - UserGroupInformation userGroupInformation, - Configuration configuration, String file, ExtendedBlock blk, - Token<BlockTokenIdentifier> token, DatanodeInfo node, - long startOffset, long length, StorageType storageType) - throws IOException { - final ShortCircuitConf scConf = conf.getShortCircuitConf(); - LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node - .getIpcPort()); - // check the cache first - BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); - if (pathinfo == null) { - if (userGroupInformation == null) { - userGroupInformation = UserGroupInformation.getCurrentUser(); - } - pathinfo = getBlockPathInfo(userGroupInformation, blk, node, - configuration, conf.getSocketTimeout(), token, - conf.isConnectToDnViaHostname(), storageType); - } - - // check to see if the file exists. It may so happen that the - // HDFS file has been deleted and this block-lookup is occurring - // on behalf of a new HDFS file. This time, the block file could - // be residing in a different portion of the fs.data.dir directory. - // In this case, we remove this entry from the cache. The next - // call to this method will re-populate the cache. - FileInputStream dataIn = null; - FileInputStream checksumIn = null; - BlockReaderLocalLegacy localBlockReader = null; - final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums() - || storageType.isTransient(); - try { - // get a local file system - File blkfile = new File(pathinfo.getBlockPath()); - dataIn = new FileInputStream(blkfile); - - if (LOG.isDebugEnabled()) { - LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size " - + blkfile.length() + " startOffset " + startOffset + " length " - + length + " short circuit checksum " + !skipChecksumCheck); - } - - if (!skipChecksumCheck) { - // get the metadata file - File metafile = new File(pathinfo.getMetaPath()); - checksumIn = new FileInputStream(metafile); - - final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( - new DataInputStream(checksumIn), blk); - long firstChunkOffset = startOffset - - (startOffset % checksum.getBytesPerChecksum()); - localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token, - startOffset, length, pathinfo, checksum, true, dataIn, - firstChunkOffset, checksumIn); - } else { - localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token, - startOffset, length, pathinfo, dataIn); - } - } catch (IOException e) { - // remove from cache - localDatanodeInfo.removeBlockLocalPathInfo(blk); - DFSClient.LOG.warn("BlockReaderLocalLegacy: Removing " + blk - + " from cache because local file " + pathinfo.getBlockPath() - + " could not be opened."); - throw e; - } finally { - if (localBlockReader == null) { - if (dataIn != null) { - dataIn.close(); - } - if (checksumIn != null) { - checksumIn.close(); - } - } - } - return localBlockReader; - } - - private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) { - LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port); - if (ldInfo == null) { - ldInfo = new LocalDatanodeInfo(); - localDatanodeInfoMap.put(port, ldInfo); - } - return ldInfo; - } - - private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi, - ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, - Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname, - StorageType storageType) throws IOException { - LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort()); - BlockLocalPathInfo pathinfo = null; - ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node, - conf, timeout, connectToDnViaHostname); - try { - // make RPC to local datanode to find local pathnames of blocks - pathinfo = proxy.getBlockLocalPathInfo(blk, token); - // We cannot cache the path information for a replica on transient storage. - // If the replica gets evicted, then it moves to a different path. Then, - // our next attempt to read from the cached path would fail to find the - // file. Additionally, the failure would cause us to disable legacy - // short-circuit read for all subsequent use in the ClientContext. Unlike - // the newer short-circuit read implementation, we have no communication - // channel for the DataNode to notify the client that the path has been - // invalidated. Therefore, our only option is to skip caching. - if (pathinfo != null && !storageType.isTransient()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cached location of block " + blk + " as " + pathinfo); - } - localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); - } - } catch (IOException e) { - localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error - throw e; - } - return pathinfo; - } - - private static int getSlowReadBufferNumChunks(int bufferSizeBytes, - int bytesPerChecksum) { - if (bufferSizeBytes < bytesPerChecksum) { - throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " + - "buffer size (" + bufferSizeBytes + ") is not large enough to hold " + - "a single chunk (" + bytesPerChecksum + "). Please configure " + - HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY + - " appropriately"); - } - - // Round down to nearest chunk size - return bufferSizeBytes / bytesPerChecksum; - } - - private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, - ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset, - long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) - throws IOException { - this(conf, hdfsfile, block, token, startOffset, length, pathinfo, - DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false, - dataIn, startOffset, null); - } - - private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, - ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset, - long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, - boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, - FileInputStream checksumIn) throws IOException { - this.filename = hdfsfile; - this.checksum = checksum; - this.verifyChecksum = verifyChecksum; - this.startOffset = Math.max(startOffset, 0); - this.blockId = block.getBlockId(); - - bytesPerChecksum = this.checksum.getBytesPerChecksum(); - checksumSize = this.checksum.getChecksumSize(); - - this.dataIn = dataIn; - this.checksumIn = checksumIn; - this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); - - final int chunksPerChecksumRead = getSlowReadBufferNumChunks( - conf.getShortCircuitBufferSize(), bytesPerChecksum); - slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); - checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); - // Initially the buffers have nothing to read. - slowReadBuff.flip(); - checksumBuff.flip(); - boolean success = false; - try { - // Skip both input streams to beginning of the chunk containing startOffset - IOUtils.skipFully(dataIn, firstChunkOffset); - if (checksumIn != null) { - long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize; - IOUtils.skipFully(checksumIn, checkSumOffset); - } - success = true; - } finally { - if (!success) { - bufferPool.returnBuffer(slowReadBuff); - bufferPool.returnBuffer(checksumBuff); - } - } - } - - /** - * Reads bytes into a buffer until EOF or the buffer's limit is reached - */ - private int fillBuffer(FileInputStream stream, ByteBuffer buf) - throws IOException { - TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" + - blockId + ")", Sampler.NEVER); - try { - int bytesRead = stream.getChannel().read(buf); - if (bytesRead < 0) { - //EOF - return bytesRead; - } - while (buf.remaining() > 0) { - int n = stream.getChannel().read(buf); - if (n < 0) { - //EOF - return bytesRead; - } - bytesRead += n; - } - return bytesRead; - } finally { - scope.close(); - } - } - - /** - * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into - * another. - */ - private void writeSlice(ByteBuffer from, ByteBuffer to, int length) { - int oldLimit = from.limit(); - from.limit(from.position() + length); - try { - to.put(from); - } finally { - from.limit(oldLimit); - } - } - - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - int nRead = 0; - if (verifyChecksum) { - // A 'direct' read actually has three phases. The first drains any - // remaining bytes from the slow read buffer. After this the read is - // guaranteed to be on a checksum chunk boundary. If there are still bytes - // to read, the fast direct path is used for as many remaining bytes as - // possible, up to a multiple of the checksum chunk size. Finally, any - // 'odd' bytes remaining at the end of the read cause another slow read to - // be issued, which involves an extra copy. - - // Every 'slow' read tries to fill the slow read buffer in one go for - // efficiency's sake. As described above, all non-checksum-chunk-aligned - // reads will be served from the slower read path. - - if (slowReadBuff.hasRemaining()) { - // There are remaining bytes from a small read available. This usually - // means this read is unaligned, which falls back to the slow path. - int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining()); - writeSlice(slowReadBuff, buf, fromSlowReadBuff); - nRead += fromSlowReadBuff; - } - - if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) { - // Since we have drained the 'small read' buffer, we are guaranteed to - // be chunk-aligned - int len = buf.remaining() - (buf.remaining() % bytesPerChecksum); - - // There's only enough checksum buffer space available to checksum one - // entire slow read buffer. This saves keeping the number of checksum - // chunks around. - len = Math.min(len, slowReadBuff.capacity()); - int oldlimit = buf.limit(); - buf.limit(buf.position() + len); - int readResult = 0; - try { - readResult = doByteBufferRead(buf); - } finally { - buf.limit(oldlimit); - } - if (readResult == -1) { - return nRead; - } else { - nRead += readResult; - buf.position(buf.position() + readResult); - } - } - - // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read - // until chunk boundary - if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) { - int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary); - int readResult = fillSlowReadBuffer(toRead); - if (readResult == -1) { - return nRead; - } else { - int fromSlowReadBuff = Math.min(readResult, buf.remaining()); - writeSlice(slowReadBuff, buf, fromSlowReadBuff); - nRead += fromSlowReadBuff; - } - } - } else { - // Non-checksummed reads are much easier; we can just fill the buffer directly. - nRead = doByteBufferRead(buf); - if (nRead > 0) { - buf.position(buf.position() + nRead); - } - } - return nRead; - } - - /** - * Tries to read as many bytes as possible into supplied buffer, checksumming - * each chunk if needed. - * - * <b>Preconditions:</b> - * <ul> - * <li> - * If checksumming is enabled, buf.remaining must be a multiple of - * bytesPerChecksum. Note that this is not a requirement for clients of - * read(ByteBuffer) - in the case of non-checksum-sized read requests, - * read(ByteBuffer) will substitute a suitably sized buffer to pass to this - * method. - * </li> - * </ul> - * <b>Postconditions:</b> - * <ul> - * <li>buf.limit and buf.mark are unchanged.</li> - * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the - * requested bytes can be read straight from the buffer</li> - * </ul> - * - * @param buf - * byte buffer to write bytes to. If checksums are not required, buf - * can have any number of bytes remaining, otherwise there must be a - * multiple of the checksum chunk size remaining. - * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt> - * that is, the the number of useful bytes (up to the amount - * requested) readable from the buffer by the client. - */ - private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException { - if (verifyChecksum) { - assert buf.remaining() % bytesPerChecksum == 0; - } - int dataRead = -1; - - int oldpos = buf.position(); - // Read as much as we can into the buffer. - dataRead = fillBuffer(dataIn, buf); - - if (dataRead == -1) { - return -1; - } - - if (verifyChecksum) { - ByteBuffer toChecksum = buf.duplicate(); - toChecksum.position(oldpos); - toChecksum.limit(oldpos + dataRead); - - checksumBuff.clear(); - // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum ); - int numChunks = - (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum; - checksumBuff.limit(checksumSize * numChunks); - - fillBuffer(checksumIn, checksumBuff); - checksumBuff.flip(); - - checksum.verifyChunkedSums(toChecksum, checksumBuff, filename, - this.startOffset); - } - - if (dataRead >= 0) { - buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead)); - } - - if (dataRead < offsetFromChunkBoundary) { - // yikes, didn't even get enough bytes to honour offset. This can happen - // even if we are verifying checksums if we are at EOF. - offsetFromChunkBoundary -= dataRead; - dataRead = 0; - } else { - dataRead -= offsetFromChunkBoundary; - offsetFromChunkBoundary = 0; - } - - return dataRead; - } - - /** - * Ensures that up to len bytes are available and checksummed in the slow read - * buffer. The number of bytes available to read is returned. If the buffer is - * not already empty, the number of remaining bytes is returned and no actual - * read happens. - * - * @param len - * the maximum number of bytes to make available. After len bytes - * are read, the underlying bytestream <b>must</b> be at a checksum - * boundary, or EOF. That is, (len + currentPosition) % - * bytesPerChecksum == 0. - * @return the number of bytes available to read, or -1 if EOF. - */ - private synchronized int fillSlowReadBuffer(int len) throws IOException { - int nRead = -1; - if (slowReadBuff.hasRemaining()) { - // Already got data, good to go. - nRead = Math.min(len, slowReadBuff.remaining()); - } else { - // Round a complete read of len bytes (plus any implicit offset) to the - // next chunk boundary, since we try and read in multiples of a chunk - int nextChunk = len + offsetFromChunkBoundary + - (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum)); - int limit = Math.min(nextChunk, slowReadBuff.capacity()); - assert limit % bytesPerChecksum == 0; - - slowReadBuff.clear(); - slowReadBuff.limit(limit); - - nRead = doByteBufferRead(slowReadBuff); - - if (nRead > 0) { - // So that next time we call slowReadBuff.hasRemaining(), we don't get a - // false positive. - slowReadBuff.limit(nRead + slowReadBuff.position()); - } - } - return nRead; - } - - @Override - public synchronized int read(byte[] buf, int off, int len) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("read off " + off + " len " + len); - } - if (!verifyChecksum) { - return dataIn.read(buf, off, len); - } - - int nRead = fillSlowReadBuffer(slowReadBuff.capacity()); - - if (nRead > 0) { - // Possible that buffer is filled with a larger read than we need, since - // we tried to read as much as possible at once - nRead = Math.min(len, nRead); - slowReadBuff.get(buf, off, nRead); - } - - return nRead; - } - - @Override - public synchronized long skip(long n) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("skip " + n); - } - if (n <= 0) { - return 0; - } - if (!verifyChecksum) { - return dataIn.skip(n); - } - - // caller made sure newPosition is not beyond EOF. - int remaining = slowReadBuff.remaining(); - int position = slowReadBuff.position(); - int newPosition = position + (int)n; - - // if the new offset is already read into dataBuff, just reposition - if (n <= remaining) { - assert offsetFromChunkBoundary == 0; - slowReadBuff.position(newPosition); - return n; - } - - // for small gap, read through to keep the data/checksum in sync - if (n - remaining <= bytesPerChecksum) { - slowReadBuff.position(position + remaining); - if (skipBuf == null) { - skipBuf = new byte[bytesPerChecksum]; - } - int ret = read(skipBuf, 0, (int)(n - remaining)); - return (remaining + ret); - } - - // optimize for big gap: discard the current buffer, skip to - // the beginning of the appropriate checksum chunk and then - // read to the middle of that chunk to be in sync with checksums. - - // We can't use this.offsetFromChunkBoundary because we need to know how - // many bytes of the offset were really read. Calling read(..) with a - // positive this.offsetFromChunkBoundary causes that many bytes to get - // silently skipped. - int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum; - long toskip = n - remaining - myOffsetFromChunkBoundary; - - slowReadBuff.position(slowReadBuff.limit()); - checksumBuff.position(checksumBuff.limit()); - - IOUtils.skipFully(dataIn, toskip); - long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize; - IOUtils.skipFully(checksumIn, checkSumOffset); - - // read into the middle of the chunk - if (skipBuf == null) { - skipBuf = new byte[bytesPerChecksum]; - } - assert skipBuf.length == bytesPerChecksum; - assert myOffsetFromChunkBoundary < bytesPerChecksum; - - int ret = read(skipBuf, 0, myOffsetFromChunkBoundary); - - if (ret == -1) { // EOS - return (toskip + remaining); - } else { - return (toskip + remaining + ret); - } - } - - @Override - public synchronized void close() throws IOException { - IOUtils.cleanup(LOG, dataIn, checksumIn); - if (slowReadBuff != null) { - bufferPool.returnBuffer(slowReadBuff); - slowReadBuff = null; - } - if (checksumBuff != null) { - bufferPool.returnBuffer(checksumBuff); - checksumBuff = null; - } - startOffset = -1; - checksum = null; - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return BlockReaderUtil.readAll(this, buf, offset, len); - } - - @Override - public void readFully(byte[] buf, int off, int len) throws IOException { - BlockReaderUtil.readFully(this, buf, off, len); - } - - @Override - public int available() throws IOException { - // We never do network I/O in BlockReaderLocalLegacy. - return Integer.MAX_VALUE; - } - - @Override - public boolean isLocal() { - return true; - } - - @Override - public boolean isShortCircuit() { - return true; - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - return null; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java deleted file mode 100644 index dbc528e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.classification.InterfaceAudience; - -import java.io.IOException; - -/** - * For sharing between the local and remote block reader implementations. - */ -@InterfaceAudience.Private -class BlockReaderUtil { - - /* See {@link BlockReader#readAll(byte[], int, int)} */ - public static int readAll(BlockReader reader, - byte[] buf, int offset, int len) throws IOException { - int n = 0; - for (;;) { - int nread = reader.read(buf, offset + n, len - n); - if (nread <= 0) - return (n == 0) ? nread : n; - n += nread; - if (n >= len) - return n; - } - } - - /* See {@link BlockReader#readFully(byte[], int, int)} */ - public static void readFully(BlockReader reader, - byte[] buf, int off, int len) throws IOException { - int toRead = len; - while (toRead > 0) { - int ret = reader.read(buf, off, toRead); - if (ret < 0) { - throw new IOException("Premature EOF from inputStream"); - } - toRead -= ret; - off += ret; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java deleted file mode 100644 index bf11463..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.util.HashMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; -import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; -import org.apache.hadoop.hdfs.util.ByteArrayManager; - -import com.google.common.annotations.VisibleForTesting; - -/** - * ClientContext contains context information for a client. - * - * This allows us to share caches such as the socket cache across - * DFSClient instances. - */ -@InterfaceAudience.Private -public class ClientContext { - private static final Log LOG = LogFactory.getLog(ClientContext.class); - - /** - * Global map of context names to caches contexts. - */ - private final static HashMap<String, ClientContext> CACHES = - new HashMap<String, ClientContext>(); - - /** - * Name of context. - */ - private final String name; - - /** - * String representation of the configuration. - */ - private final String confString; - - /** - * Caches short-circuit file descriptors, mmap regions. - */ - private final ShortCircuitCache shortCircuitCache; - - /** - * Caches TCP and UNIX domain sockets for reuse. - */ - private final PeerCache peerCache; - - /** - * Stores information about socket paths. - */ - private final DomainSocketFactory domainSocketFactory; - - /** - * Caches key Providers for the DFSClient - */ - private final KeyProviderCache keyProviderCache; - /** - * True if we should use the legacy BlockReaderLocal. - */ - private final boolean useLegacyBlockReaderLocal; - - /** - * True if the legacy BlockReaderLocal is disabled. - * - * The legacy block reader local gets disabled completely whenever there is an - * error or miscommunication. The new block reader local code handles this - * case more gracefully inside DomainSocketFactory. - */ - private volatile boolean disableLegacyBlockReaderLocal = false; - - /** Creating byte[] for {@link DFSOutputStream}. */ - private final ByteArrayManager byteArrayManager; - - /** - * Whether or not we complained about a DFSClient fetching a CacheContext that - * didn't match its config values yet. - */ - private boolean printedConfWarning = false; - - private ClientContext(String name, DfsClientConf conf) { - final ShortCircuitConf scConf = conf.getShortCircuitConf(); - - this.name = name; - this.confString = scConf.confAsString(); - this.shortCircuitCache = ShortCircuitCache.fromConf(scConf); - this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), - scConf.getSocketCacheExpiry()); - this.keyProviderCache = new KeyProviderCache( - scConf.getKeyProviderCacheExpiryMs()); - this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal(); - this.domainSocketFactory = new DomainSocketFactory(scConf); - - this.byteArrayManager = ByteArrayManager.newInstance( - conf.getWriteByteArrayManagerConf()); - } - - public static ClientContext get(String name, DfsClientConf conf) { - ClientContext context; - synchronized(ClientContext.class) { - context = CACHES.get(name); - if (context == null) { - context = new ClientContext(name, conf); - CACHES.put(name, context); - } else { - context.printConfWarningIfNeeded(conf); - } - } - return context; - } - - /** - * Get a client context, from a Configuration object. - * - * This method is less efficient than the version which takes a DFSClient#Conf - * object, and should be mostly used by tests. - */ - @VisibleForTesting - public static ClientContext getFromConf(Configuration conf) { - return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, - HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), - new DfsClientConf(conf)); - } - - private void printConfWarningIfNeeded(DfsClientConf conf) { - String existing = this.getConfString(); - String requested = conf.getShortCircuitConf().confAsString(); - if (!existing.equals(requested)) { - if (!printedConfWarning) { - printedConfWarning = true; - LOG.warn("Existing client context '" + name + "' does not match " + - "requested configuration. Existing: " + existing + - ", Requested: " + requested); - } - } - } - - public String getConfString() { - return confString; - } - - public ShortCircuitCache getShortCircuitCache() { - return shortCircuitCache; - } - - public PeerCache getPeerCache() { - return peerCache; - } - - public KeyProviderCache getKeyProviderCache() { - return keyProviderCache; - } - - public boolean getUseLegacyBlockReaderLocal() { - return useLegacyBlockReaderLocal; - } - - public boolean getDisableLegacyBlockReaderLocal() { - return disableLegacyBlockReaderLocal; - } - - public void setDisableLegacyBlockReaderLocal() { - disableLegacyBlockReaderLocal = true; - } - - public DomainSocketFactory getDomainSocketFactory() { - return domainSocketFactory; - } - - public ByteArrayManager getByteArrayManager() { - return byteArrayManager; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 57a5aed..6420b55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -80,8 +80,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024; public static final String DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = "dfs.datanode.balance.max.concurrent.moves"; public static final int DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5; - public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; - public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB + @Deprecated + public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY; + @Deprecated + public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes"; public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false; public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes"; @@ -505,7 +509,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_WEB_UGI_KEY = "dfs.web.ugi"; public static final String DFS_NAMENODE_STARTUP_KEY = "dfs.namenode.startup"; public static final String DFS_DATANODE_KEYTAB_FILE_KEY = "dfs.datanode.keytab.file"; - public static final String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; + public static final String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = + HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; @Deprecated public static final String DFS_DATANODE_USER_NAME_KEY = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS = "dfs.datanode.shared.file.descriptor.paths"; @@ -604,7 +609,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class"; public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100; public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses"; - public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri"; + public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI = + HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI; // Journal-node related configs. These are read on the JN side. public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 7f3722f..139a27c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -364,7 +364,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, ClientDatanodeProtocol cdp = null; try { - cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, + cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode, dfsClient.getConfiguration(), conf.getSocketTimeout(), conf.isConnectToDnViaHostname(), locatedblock); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index cae56c0..5c8a700 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -53,8 +53,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; -import javax.net.SocketFactory; - import com.google.common.collect.Sets; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -69,16 +67,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; -import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.http.HttpConfig; @@ -932,29 +925,6 @@ public class DFSUtil { public static int roundBytesToGB(long bytes) { return Math.round((float)bytes/ 1024 / 1024 / 1024); } - - /** Create a {@link ClientDatanodeProtocol} proxy */ - public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( - DatanodeID datanodeid, Configuration conf, int socketTimeout, - boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException { - return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout, - connectToDnViaHostname, locatedBlock); - } - - /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */ - public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( - DatanodeID datanodeid, Configuration conf, int socketTimeout, - boolean connectToDnViaHostname) throws IOException { - return new ClientDatanodeProtocolTranslatorPB( - datanodeid, conf, socketTimeout, connectToDnViaHostname); - } - - /** Create a {@link ClientDatanodeProtocol} proxy */ - public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( - InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory) throws IOException { - return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory); - } /** * Get nameservice Id for the {@link NameNode} based on namenode RPC address @@ -1450,41 +1420,6 @@ public class DFSUtil { } /** - * Creates a new KeyProvider from the given Configuration. - * - * @param conf Configuration - * @return new KeyProvider, or null if no provider was found. - * @throws IOException if the KeyProvider is improperly specified in - * the Configuration - */ - public static KeyProvider createKeyProvider( - final Configuration conf) throws IOException { - final String providerUriStr = - conf.getTrimmed(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, ""); - // No provider set in conf - if (providerUriStr.isEmpty()) { - return null; - } - final URI providerUri; - try { - providerUri = new URI(providerUriStr); - } catch (URISyntaxException e) { - throw new IOException(e); - } - KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf); - if (keyProvider == null) { - throw new IOException("Could not instantiate KeyProvider from " + - DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '" + - providerUriStr +"'"); - } - if (keyProvider.isTransient()) { - throw new IOException("KeyProvider " + keyProvider.toString() - + " was found but it is a transient provider."); - } - return keyProvider; - } - - /** * Creates a new KeyProviderCryptoExtension by wrapping the * KeyProvider specified in the given Configuration. * @@ -1495,7 +1430,7 @@ public class DFSUtil { */ public static KeyProviderCryptoExtension createKeyProviderCryptoExtension( final Configuration conf) throws IOException { - KeyProvider keyProvider = createKeyProvider(conf); + KeyProvider keyProvider = DFSUtilClient.createKeyProvider(conf); if (keyProvider == null) { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java deleted file mode 100644 index e135d8e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumSet; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; - -/** - * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from - * replicas. - */ -@InterfaceAudience.Private -public final class ExternalBlockReader implements BlockReader { - private final ReplicaAccessor accessor; - private final long visibleLength; - private long pos; - - ExternalBlockReader(ReplicaAccessor accessor, long visibleLength, - long startOffset) { - this.accessor = accessor; - this.visibleLength = visibleLength; - this.pos = startOffset; - } - - @Override - public int read(byte[] buf, int off, int len) throws IOException { - int nread = accessor.read(pos, buf, off, len); - pos += nread; - return nread; - } - - @Override - public int read(ByteBuffer buf) throws IOException { - int nread = accessor.read(pos, buf); - pos += nread; - return nread; - } - - @Override - public long skip(long n) throws IOException { - // You cannot skip backwards - if (n <= 0) { - return 0; - } - // You can't skip past the end of the replica. - long oldPos = pos; - pos += n; - if (pos > visibleLength) { - pos = visibleLength; - } - return pos - oldPos; - } - - @Override - public int available() throws IOException { - // We return the amount of bytes that we haven't read yet from the - // replica, based on our current position. Some of the other block - // readers return a shorter length than that. The only advantage to - // returning a shorter length is that the DFSInputStream will - // trash your block reader and create a new one if someone tries to - // seek() beyond the available() region. - long diff = visibleLength - pos; - if (diff > Integer.MAX_VALUE) { - return Integer.MAX_VALUE; - } else { - return (int)diff; - } - } - - @Override - public void close() throws IOException { - accessor.close(); - } - - @Override - public void readFully(byte[] buf, int offset, int len) throws IOException { - BlockReaderUtil.readFully(this, buf, offset, len); - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return BlockReaderUtil.readAll(this, buf, offset, len); - } - - @Override - public boolean isLocal() { - return accessor.isLocal(); - } - - @Override - public boolean isShortCircuit() { - return accessor.isShortCircuit(); - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - // For now, pluggable ReplicaAccessors do not support zero-copy. - return null; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java deleted file mode 100644 index a2b6c7e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.crypto.key.KeyProvider; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - -@InterfaceAudience.Private -public class KeyProviderCache { - - public static final Log LOG = LogFactory.getLog(KeyProviderCache.class); - - private final Cache<URI, KeyProvider> cache; - - public KeyProviderCache(long expiryMs) { - cache = CacheBuilder.newBuilder() - .expireAfterAccess(expiryMs, TimeUnit.MILLISECONDS) - .removalListener(new RemovalListener<URI, KeyProvider>() { - @Override - public void onRemoval( - RemovalNotification<URI, KeyProvider> notification) { - try { - notification.getValue().close(); - } catch (Throwable e) { - LOG.error( - "Error closing KeyProvider with uri [" - + notification.getKey() + "]", e); - ; - } - } - }) - .build(); - } - - public KeyProvider get(final Configuration conf) { - URI kpURI = createKeyProviderURI(conf); - if (kpURI == null) { - return null; - } - try { - return cache.get(kpURI, new Callable<KeyProvider>() { - @Override - public KeyProvider call() throws Exception { - return DFSUtil.createKeyProvider(conf); - } - }); - } catch (Exception e) { - LOG.error("Could not create KeyProvider for DFSClient !!", e.getCause()); - return null; - } - } - - private URI createKeyProviderURI(Configuration conf) { - final String providerUriStr = - conf.getTrimmed(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, ""); - // No provider set in conf - if (providerUriStr.isEmpty()) { - LOG.error("Could not find uri with key [" - + DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI - + "] to create a keyProvider !!"); - return null; - } - final URI providerUri; - try { - providerUri = new URI(providerUriStr); - } catch (URISyntaxException e) { - LOG.error("KeyProvider URI string is invalid [" + providerUriStr - + "]!!", e.getCause()); - return null; - } - return providerUri; - } - - @VisibleForTesting - public void setKeyProvider(Configuration conf, KeyProvider keyProvider) - throws IOException { - URI uri = createKeyProviderURI(conf); - cache.put(uri, keyProvider); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java deleted file mode 100644 index 08b0468..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java +++ /dev/null @@ -1,290 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.LinkedListMultimap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.Time; - -/** - * A cache of input stream sockets to Data Node. - */ -@InterfaceStability.Unstable -@InterfaceAudience.Private -@VisibleForTesting -public class PeerCache { - private static final Log LOG = LogFactory.getLog(PeerCache.class); - - private static class Key { - final DatanodeID dnID; - final boolean isDomain; - - Key(DatanodeID dnID, boolean isDomain) { - this.dnID = dnID; - this.isDomain = isDomain; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof Key)) { - return false; - } - Key other = (Key)o; - return dnID.equals(other.dnID) && isDomain == other.isDomain; - } - - @Override - public int hashCode() { - return dnID.hashCode() ^ (isDomain ? 1 : 0); - } - } - - private static class Value { - private final Peer peer; - private final long time; - - Value(Peer peer, long time) { - this.peer = peer; - this.time = time; - } - - Peer getPeer() { - return peer; - } - - long getTime() { - return time; - } - } - - private Daemon daemon; - /** A map for per user per datanode. */ - private final LinkedListMultimap<Key, Value> multimap = - LinkedListMultimap.create(); - private final int capacity; - private final long expiryPeriod; - - public PeerCache(int c, long e) { - this.capacity = c; - this.expiryPeriod = e; - - if (capacity == 0 ) { - LOG.info("SocketCache disabled."); - } else if (expiryPeriod == 0) { - throw new IllegalStateException("Cannot initialize expiryPeriod to " + - expiryPeriod + " when cache is enabled."); - } - } - - private boolean isDaemonStarted() { - return (daemon == null)? false: true; - } - - private synchronized void startExpiryDaemon() { - // start daemon only if not already started - if (isDaemonStarted() == true) { - return; - } - - daemon = new Daemon(new Runnable() { - @Override - public void run() { - try { - PeerCache.this.run(); - } catch(InterruptedException e) { - //noop - } finally { - PeerCache.this.clear(); - } - } - - @Override - public String toString() { - return String.valueOf(PeerCache.this); - } - }); - daemon.start(); - } - - /** - * Get a cached peer connected to the given DataNode. - * @param dnId The DataNode to get a Peer for. - * @param isDomain Whether to retrieve a DomainPeer or not. - * - * @return An open Peer connected to the DN, or null if none - * was found. - */ - public Peer get(DatanodeID dnId, boolean isDomain) { - - if (capacity <= 0) { // disabled - return null; - } - return getInternal(dnId, isDomain); - } - - private synchronized Peer getInternal(DatanodeID dnId, boolean isDomain) { - List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain)); - if (sockStreamList == null) { - return null; - } - - Iterator<Value> iter = sockStreamList.iterator(); - while (iter.hasNext()) { - Value candidate = iter.next(); - iter.remove(); - long ageMs = Time.monotonicNow() - candidate.getTime(); - Peer peer = candidate.getPeer(); - if (ageMs >= expiryPeriod) { - try { - peer.close(); - } catch (IOException e) { - LOG.warn("got IOException closing stale peer " + peer + - ", which is " + ageMs + " ms old"); - } - } else if (!peer.isClosed()) { - return peer; - } - } - return null; - } - - /** - * Give an unused socket to the cache. - */ - public void put(DatanodeID dnId, Peer peer) { - Preconditions.checkNotNull(dnId); - Preconditions.checkNotNull(peer); - if (peer.isClosed()) return; - if (capacity <= 0) { - // Cache disabled. - IOUtils.cleanup(LOG, peer); - return; - } - putInternal(dnId, peer); - } - - private synchronized void putInternal(DatanodeID dnId, Peer peer) { - startExpiryDaemon(); - - if (capacity == multimap.size()) { - evictOldest(); - } - multimap.put(new Key(dnId, peer.getDomainSocket() != null), - new Value(peer, Time.monotonicNow())); - } - - public synchronized int size() { - return multimap.size(); - } - - /** - * Evict and close sockets older than expiry period from the cache. - */ - private synchronized void evictExpired(long expiryPeriod) { - while (multimap.size() != 0) { - Iterator<Entry<Key, Value>> iter = - multimap.entries().iterator(); - Entry<Key, Value> entry = iter.next(); - // if oldest socket expired, remove it - if (entry == null || - Time.monotonicNow() - entry.getValue().getTime() < - expiryPeriod) { - break; - } - IOUtils.cleanup(LOG, entry.getValue().getPeer()); - iter.remove(); - } - } - - /** - * Evict the oldest entry in the cache. - */ - private synchronized void evictOldest() { - // We can get the oldest element immediately, because of an interesting - // property of LinkedListMultimap: its iterator traverses entries in the - // order that they were added. - Iterator<Entry<Key, Value>> iter = - multimap.entries().iterator(); - if (!iter.hasNext()) { - throw new IllegalStateException("Cannot evict from empty cache! " + - "capacity: " + capacity); - } - Entry<Key, Value> entry = iter.next(); - IOUtils.cleanup(LOG, entry.getValue().getPeer()); - iter.remove(); - } - - /** - * Periodically check in the cache and expire the entries - * older than expiryPeriod minutes - */ - private void run() throws InterruptedException { - for(long lastExpiryTime = Time.monotonicNow(); - !Thread.interrupted(); - Thread.sleep(expiryPeriod)) { - final long elapsed = Time.monotonicNow() - lastExpiryTime; - if (elapsed >= expiryPeriod) { - evictExpired(expiryPeriod); - lastExpiryTime = Time.monotonicNow(); - } - } - clear(); - throw new InterruptedException("Daemon Interrupted"); - } - - /** - * Empty the cache, and close all sockets. - */ - @VisibleForTesting - synchronized void clear() { - for (Value value : multimap.values()) { - IOUtils.cleanup(LOG, value.getPeer()); - } - multimap.clear(); - } - - @VisibleForTesting - void close() { - clear(); - if (daemon != null) { - daemon.interrupt(); - try { - daemon.join(); - } catch (InterruptedException e) { - throw new RuntimeException("failed to join thread"); - } - } - daemon = null; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java deleted file mode 100644 index 07f4836..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.client; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Options that can be specified when manually triggering a block report. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public final class BlockReportOptions { - private final boolean incremental; - - private BlockReportOptions(boolean incremental) { - this.incremental = incremental; - } - - public boolean isIncremental() { - return incremental; - } - - public static class Factory { - private boolean incremental = false; - - public Factory() { - } - - public Factory setIncremental(boolean incremental) { - this.incremental = incremental; - return this; - } - - public BlockReportOptions build() { - return new BlockReportOptions(incremental); - } - } - - @Override - public String toString() { - return "BlockReportOptions{incremental=" + incremental + "}"; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java deleted file mode 100644 index 69fa52d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * A block and the full path information to the block data file and - * the metadata file stored on the local file system. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class BlockLocalPathInfo { - private final ExtendedBlock block; - private String localBlockPath = ""; // local file storing the data - private String localMetaPath = ""; // local file storing the checksum - - /** - * Constructs BlockLocalPathInfo. - * @param b The block corresponding to this lock path info. - * @param file Block data file. - * @param metafile Metadata file for the block. - */ - public BlockLocalPathInfo(ExtendedBlock b, String file, String metafile) { - block = b; - localBlockPath = file; - localMetaPath = metafile; - } - - /** - * Get the Block data file. - * @return Block data file. - */ - public String getBlockPath() {return localBlockPath;} - - /** - * @return the Block - */ - public ExtendedBlock getBlock() { return block;} - - /** - * Get the Block metadata file. - * @return Block metadata file. - */ - public String getMetaPath() {return localMetaPath;} - - /** - * Get number of bytes in the block. - * @return Number of bytes in the block. - */ - public long getNumBytes() { - return block.getNumBytes(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java deleted file mode 100644 index da8f4ab..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.ReconfigurationTaskStatus; -import org.apache.hadoop.hdfs.client.BlockReportOptions; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; -import org.apache.hadoop.security.KerberosInfo; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenInfo; - -/** An client-datanode protocol for block recovery - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -@KerberosInfo( - serverPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY) -@TokenInfo(BlockTokenSelector.class) -public interface ClientDatanodeProtocol { - /** - * Until version 9, this class ClientDatanodeProtocol served as both - * the client interface to the DN AND the RPC protocol used to - * communicate with the NN. - * - * This class is used by both the DFSClient and the - * DN server side to insulate from the protocol serialization. - * - * If you are adding/changing DN's interface then you need to - * change both this class and ALSO related protocol buffer - * wire protocol definition in ClientDatanodeProtocol.proto. - * - * For more details on protocol buffer wire protocol, please see - * .../org/apache/hadoop/hdfs/protocolPB/overview.html - * - * The log of historical changes can be retrieved from the svn). - * 9: Added deleteBlockPool method - * - * 9 is the last version id when this class was used for protocols - * serialization. DO not update this version any further. - */ - public static final long versionID = 9L; - - /** Return the visible length of a replica. */ - long getReplicaVisibleLength(ExtendedBlock b) throws IOException; - - /** - * Refresh the list of federated namenodes from updated configuration - * Adds new namenodes and stops the deleted namenodes. - * - * @throws IOException on error - **/ - void refreshNamenodes() throws IOException; - - /** - * Delete the block pool directory. If force is false it is deleted only if - * it is empty, otherwise it is deleted along with its contents. - * - * @param bpid Blockpool id to be deleted. - * @param force If false blockpool directory is deleted only if it is empty - * i.e. if it doesn't contain any block files, otherwise it is - * deleted along with its contents. - * @throws IOException - */ - void deleteBlockPool(String bpid, boolean force) throws IOException; - - /** - * Retrieves the path names of the block file and metadata file stored on the - * local file system. - * - * In order for this method to work, one of the following should be satisfied: - * <ul> - * <li> - * The client user must be configured at the datanode to be able to use this - * method.</li> - * <li> - * When security is enabled, kerberos authentication must be used to connect - * to the datanode.</li> - * </ul> - * - * @param block - * the specified block on the local datanode - * @param token - * the block access token. - * @return the BlockLocalPathInfo of a block - * @throws IOException - * on error - */ - BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, - Token<BlockTokenIdentifier> token) throws IOException; - - /** - * Shuts down a datanode. - * - * @param forUpgrade If true, data node does extra prep work before shutting - * down. The work includes advising clients to wait and saving - * certain states for quick restart. This should only be used when - * the stored data will remain the same during upgrade/restart. - * @throws IOException - */ - void shutdownDatanode(boolean forUpgrade) throws IOException; - - /** - * Obtains datanode info - * - * @return software/config version and uptime of the datanode - */ - DatanodeLocalInfo getDatanodeInfo() throws IOException; - - /** - * Asynchronously reload configuration on disk and apply changes. - */ - void startReconfiguration() throws IOException; - - /** - * Get the status of the previously issued reconfig task. - * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}. - */ - ReconfigurationTaskStatus getReconfigurationStatus() throws IOException; - - /** - * Get a list of allowed properties for reconfiguration. - */ - List<String> listReconfigurableProperties() throws IOException; - - /** - * Trigger a new block report. - */ - void triggerBlockReport(BlockReportOptions options) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java deleted file mode 100644 index 170467e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Encryption key verification failed. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class InvalidEncryptionKeyException extends IOException { - private static final long serialVersionUID = 0l; - - public InvalidEncryptionKeyException() { - super(); - } - - public InvalidEncryptionKeyException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 694f521..85da414 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCustomProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; @@ -115,7 +114,7 @@ public abstract class Receiver implements DataTransferProtocol { TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { - readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + readBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), proto.getOffset(), @@ -136,7 +135,7 @@ public abstract class Receiver implements DataTransferProtocol { TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { - writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelperClient.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), @@ -167,7 +166,7 @@ public abstract class Receiver implements DataTransferProtocol { TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { - transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + transferBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), targets, @@ -186,7 +185,7 @@ public abstract class Receiver implements DataTransferProtocol { TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { - requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), + requestShortCircuitFds(PBHelperClient.convert(proto.getHeader().getBlock()), PBHelper.convert(proto.getHeader().getToken()), slotId, proto.getMaxVersion(), proto.getSupportsReceiptVerification()); @@ -228,7 +227,7 @@ public abstract class Receiver implements DataTransferProtocol { TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { - replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), + replaceBlock(PBHelperClient.convert(proto.getHeader().getBlock()), PBHelperClient.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getToken()), proto.getDelHint(), @@ -244,7 +243,7 @@ public abstract class Receiver implements DataTransferProtocol { TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { - copyBlock(PBHelper.convert(proto.getHeader().getBlock()), + copyBlock(PBHelperClient.convert(proto.getHeader().getBlock()), PBHelper.convert(proto.getHeader().getToken())); } finally { if (traceScope != null) traceScope.close(); @@ -257,7 +256,7 @@ public abstract class Receiver implements DataTransferProtocol { TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { - blockChecksum(PBHelper.convert(proto.getHeader().getBlock()), + blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()), PBHelper.convert(proto.getHeader().getToken())); } finally { if (traceScope != null) traceScope.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java deleted file mode 100644 index 21073eb..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocolPB; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; -import org.apache.hadoop.ipc.ProtocolInfo; -import org.apache.hadoop.security.KerberosInfo; -import org.apache.hadoop.security.token.TokenInfo; - -@KerberosInfo( - serverPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY) -@TokenInfo(BlockTokenSelector.class) -@ProtocolInfo(protocolName = - "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol", - protocolVersion = 1) -@InterfaceAudience.Private -public interface ClientDatanodeProtocolPB extends - ClientDatanodeProtocolService.BlockingInterface { -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 3886007..5efcf67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import com.google.common.base.Optional; @@ -86,7 +84,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements throws ServiceException { long len; try { - len = impl.getReplicaVisibleLength(PBHelper.convert(request.getBlock())); + len = impl.getReplicaVisibleLength(PBHelperClient.convert(request.getBlock())); } catch (IOException e) { throw new ServiceException(e); } @@ -123,7 +121,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements throws ServiceException { BlockLocalPathInfo resp; try { - resp = impl.getBlockLocalPathInfo(PBHelper.convert(request.getBlock()), PBHelper.convert(request.getToken())); + resp = impl.getBlockLocalPathInfo(PBHelperClient.convert(request.getBlock()), PBHelper.convert(request.getToken())); } catch (IOException e) { throw new ServiceException(e); }