http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java new file mode 100755 index 0000000..de1d1ee --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -0,0 +1,918 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.channels.ClosedChannelException; +import java.util.EnumSet; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.CanSetDropBehind; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSOutputSummer; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; +import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; +import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; +import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.hdfs.util.ByteArrayManager; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DataChecksum.Type; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Time; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + + +/**************************************************************** + * DFSOutputStream creates files from a stream of bytes. + * + * The client application writes data that is cached internally by + * this stream. Data is broken up into packets, each packet is + * typically 64K in size. A packet comprises of chunks. Each chunk + * is typically 512 bytes and has an associated checksum with it. + * + * When a client application fills up the currentPacket, it is + * enqueued into the dataQueue of DataStreamer. DataStreamer is a + * thread that picks up packets from the dataQueue and sends it to + * the first datanode in the pipeline. + * + ****************************************************************/ +@InterfaceAudience.Private +public class DFSOutputStream extends FSOutputSummer + implements Syncable, CanSetDropBehind { + static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class); + /** + * Number of times to retry creating a file when there are transient + * errors (typically related to encryption zones and KeyProvider operations). + */ + @VisibleForTesting + static final int CREATE_RETRY_COUNT = 10; + @VisibleForTesting + static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS = + CryptoProtocolVersion.supported(); + + protected final DFSClient dfsClient; + protected final ByteArrayManager byteArrayManager; + // closed is accessed by different threads under different locks. + protected volatile boolean closed = false; + + protected final String src; + protected final long fileId; + protected final long blockSize; + protected final int bytesPerChecksum; + + protected DFSPacket currentPacket = null; + private DataStreamer streamer; + protected int packetSize = 0; // write packet size, not including the header. + protected int chunksPerPacket = 0; + protected long lastFlushOffset = 0; // offset when flush was invoked + private long initialFileSize = 0; // at time of file open + private final short blockReplication; // replication factor of file + protected boolean shouldSyncBlock = false; // force blocks to disk upon close + protected final AtomicReference<CachingStrategy> cachingStrategy; + private FileEncryptionInfo fileEncryptionInfo; + + /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ + protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, + long seqno, boolean lastPacketInBlock) throws InterruptedIOException { + final byte[] buf; + final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize; + + try { + buf = byteArrayManager.newByteArray(bufferSize); + } catch (InterruptedException ie) { + final InterruptedIOException iioe = new InterruptedIOException( + "seqno=" + seqno); + iioe.initCause(ie); + throw iioe; + } + + return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno, + getChecksumSize(), lastPacketInBlock); + } + + @Override + protected void checkClosed() throws IOException { + if (isClosed()) { + getStreamer().getLastException().throwException4Close(); + } + } + + // + // returns the list of targets, if any, that is being currently used. + // + @VisibleForTesting + public synchronized DatanodeInfo[] getPipeline() { + if (getStreamer().streamerClosed()) { + return null; + } + DatanodeInfo[] currentNodes = getStreamer().getNodes(); + if (currentNodes == null) { + return null; + } + DatanodeInfo[] value = new DatanodeInfo[currentNodes.length]; + for (int i = 0; i < currentNodes.length; i++) { + value[i] = currentNodes[i]; + } + return value; + } + + /** + * @return the object for computing checksum. + * The type is NULL if checksum is not computed. + */ + private static DataChecksum getChecksum4Compute(DataChecksum checksum, + HdfsFileStatus stat) { + if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) { + // do not compute checksum for writing to single replica to memory + return DataChecksum.newDataChecksum(Type.NULL, + checksum.getBytesPerChecksum()); + } + return checksum; + } + + private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, + HdfsFileStatus stat, DataChecksum checksum) throws IOException { + super(getChecksum4Compute(checksum, stat)); + this.dfsClient = dfsClient; + this.src = src; + this.fileId = stat.getFileId(); + this.blockSize = stat.getBlockSize(); + this.blockReplication = stat.getReplication(); + this.fileEncryptionInfo = stat.getFileEncryptionInfo(); + this.cachingStrategy = new AtomicReference<CachingStrategy>( + dfsClient.getDefaultWriteCachingStrategy()); + if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug( + "Set non-null progress callback on DFSOutputStream " + src); + } + + this.bytesPerChecksum = checksum.getBytesPerChecksum(); + if (bytesPerChecksum <= 0) { + throw new HadoopIllegalArgumentException( + "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0"); + } + if (blockSize % bytesPerChecksum != 0) { + throw new HadoopIllegalArgumentException("Invalid values: " + + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum + + ") must divide block size (=" + blockSize + ")."); + } + this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); + } + + /** Construct a new output stream for creating a file. */ + protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, + EnumSet<CreateFlag> flag, Progressable progress, + DataChecksum checksum, String[] favoredNodes) throws IOException { + this(dfsClient, src, progress, stat, checksum); + this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); + + computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); + + streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, + cachingStrategy, byteArrayManager, favoredNodes); + } + + static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, + FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, + short replication, long blockSize, Progressable progress, int buffersize, + DataChecksum checksum, String[] favoredNodes) throws IOException { + TraceScope scope = + dfsClient.getPathTraceScope("newStreamForCreate", src); + try { + HdfsFileStatus stat = null; + + // Retry the create if we get a RetryStartFileException up to a maximum + // number of times + boolean shouldRetry = true; + int retryCount = CREATE_RETRY_COUNT; + while (shouldRetry) { + shouldRetry = false; + try { + stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, + new EnumSetWritable<CreateFlag>(flag), createParent, replication, + blockSize, SUPPORTED_CRYPTO_VERSIONS); + break; + } catch (RemoteException re) { + IOException e = re.unwrapRemoteException( + AccessControlException.class, + DSQuotaExceededException.class, + QuotaByStorageTypeExceededException.class, + FileAlreadyExistsException.class, + FileNotFoundException.class, + ParentNotDirectoryException.class, + NSQuotaExceededException.class, + RetryStartFileException.class, + SafeModeException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class, + UnknownCryptoProtocolVersionException.class); + if (e instanceof RetryStartFileException) { + if (retryCount > 0) { + shouldRetry = true; + retryCount--; + } else { + throw new IOException("Too many retries because of encryption" + + " zone operations", e); + } + } else { + throw e; + } + } + } + Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); + final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, + flag, progress, checksum, favoredNodes); + out.start(); + return out; + } finally { + scope.close(); + } + } + + /** Construct a new output stream for append. */ + private DFSOutputStream(DFSClient dfsClient, String src, + EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, + HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) + throws IOException { + this(dfsClient, src, progress, stat, checksum); + initialFileSize = stat.getLen(); // length of file when opened + this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); + + boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK); + + this.fileEncryptionInfo = stat.getFileEncryptionInfo(); + + // The last partial block of the file has to be filled. + if (!toNewBlock && lastBlock != null) { + // indicate that we are appending to an existing block + streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum, + cachingStrategy, byteArrayManager); + getStreamer().setBytesCurBlock(lastBlock.getBlockSize()); + adjustPacketChunkSize(stat); + getStreamer().setPipelineInConstruction(lastBlock); + } else { + computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), + bytesPerChecksum); + streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, + dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, + favoredNodes); + } + } + + private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{ + + long usedInLastBlock = stat.getLen() % blockSize; + int freeInLastBlock = (int)(blockSize - usedInLastBlock); + + // calculate the amount of free space in the pre-existing + // last crc chunk + int usedInCksum = (int)(stat.getLen() % bytesPerChecksum); + int freeInCksum = bytesPerChecksum - usedInCksum; + + // if there is space in the last block, then we have to + // append to that block + if (freeInLastBlock == blockSize) { + throw new IOException("The last block for file " + + src + " is full."); + } + + if (usedInCksum > 0 && freeInCksum > 0) { + // if there is space in the last partial chunk, then + // setup in such a way that the next packet will have only + // one chunk that fills up the partial chunk. + // + computePacketChunkSize(0, freeInCksum); + setChecksumBufSize(freeInCksum); + getStreamer().setAppendChunk(true); + } else { + // if the remaining space in the block is smaller than + // that expected size of of a packet, then create + // smaller size packet. + // + computePacketChunkSize( + Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock), + bytesPerChecksum); + } + } + + static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, + EnumSet<CreateFlag> flags, int bufferSize, Progressable progress, + LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, + String[] favoredNodes) throws IOException { + TraceScope scope = + dfsClient.getPathTraceScope("newStreamForAppend", src); + try { + final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, + progress, lastBlock, stat, checksum, favoredNodes); + out.start(); + return out; + } finally { + scope.close(); + } + } + + protected void computePacketChunkSize(int psize, int csize) { + final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN; + final int chunkSize = csize + getChecksumSize(); + chunksPerPacket = Math.max(bodySize/chunkSize, 1); + packetSize = chunkSize*chunksPerPacket; + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("computePacketChunkSize: src=" + src + + ", chunkSize=" + chunkSize + + ", chunksPerPacket=" + chunksPerPacket + + ", packetSize=" + packetSize); + } + } + + protected TraceScope createWriteTraceScope() { + return dfsClient.getPathTraceScope("DFSOutputStream#write", src); + } + + // @see FSOutputSummer#writeChunk() + @Override + protected synchronized void writeChunk(byte[] b, int offset, int len, + byte[] checksum, int ckoff, int cklen) throws IOException { + dfsClient.checkOpen(); + checkClosed(); + + if (len > bytesPerChecksum) { + throw new IOException("writeChunk() buffer size is " + len + + " is larger than supported bytesPerChecksum " + + bytesPerChecksum); + } + if (cklen != 0 && cklen != getChecksumSize()) { + throw new IOException("writeChunk() checksum size is supposed to be " + + getChecksumSize() + " but found to be " + cklen); + } + + if (currentPacket == null) { + currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer() + .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + + currentPacket.getSeqno() + + ", src=" + src + + ", packetSize=" + packetSize + + ", chunksPerPacket=" + chunksPerPacket + + ", bytesCurBlock=" + getStreamer().getBytesCurBlock()); + } + } + + currentPacket.writeChecksum(checksum, ckoff, cklen); + currentPacket.writeData(b, offset, len); + currentPacket.incNumChunks(); + getStreamer().incBytesCurBlock(len); + + // If packet is full, enqueue it for transmission + // + if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || + getStreamer().getBytesCurBlock() == blockSize) { + enqueueCurrentPacketFull(); + } + } + + void enqueueCurrentPacket() throws IOException { + getStreamer().waitAndQueuePacket(currentPacket); + currentPacket = null; + } + + void enqueueCurrentPacketFull() throws IOException { + LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}," + + " appendChunk={}, {}", currentPacket, src, getStreamer() + .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), + getStreamer()); + enqueueCurrentPacket(); + adjustChunkBoundary(); + endBlock(); + } + + /** create an empty packet to mark the end of the block. */ + void setCurrentPacketToEmpty() throws InterruptedIOException { + currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), + getStreamer().getAndIncCurrentSeqno(), true); + currentPacket.setSyncBlock(shouldSyncBlock); + } + + /** + * If the reopened file did not end at chunk boundary and the above + * write filled up its partial chunk. Tell the summer to generate full + * crc chunks from now on. + */ + protected void adjustChunkBoundary() { + if (getStreamer().getAppendChunk() && + getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) { + getStreamer().setAppendChunk(false); + resetChecksumBufSize(); + } + + if (!getStreamer().getAppendChunk()) { + int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()), + dfsClient.getConf().getWritePacketSize()); + computePacketChunkSize(psize, bytesPerChecksum); + } + } + + /** + * if encountering a block boundary, send an empty packet to + * indicate the end of block and reset bytesCurBlock. + * + * @throws IOException + */ + protected void endBlock() throws IOException { + if (getStreamer().getBytesCurBlock() == blockSize) { + setCurrentPacketToEmpty(); + enqueueCurrentPacket(); + getStreamer().setBytesCurBlock(0); + lastFlushOffset = 0; + } + } + + /** + * Flushes out to all replicas of the block. The data is in the buffers + * of the DNs but not necessarily in the DN's OS buffers. + * + * It is a synchronous operation. When it returns, + * it guarantees that flushed data become visible to new readers. + * It is not guaranteed that data has been flushed to + * persistent store on the datanode. + * Block allocations are persisted on namenode. + */ + @Override + public void hflush() throws IOException { + TraceScope scope = + dfsClient.getPathTraceScope("hflush", src); + try { + flushOrSync(false, EnumSet.noneOf(SyncFlag.class)); + } finally { + scope.close(); + } + } + + @Override + public void hsync() throws IOException { + TraceScope scope = + dfsClient.getPathTraceScope("hsync", src); + try { + flushOrSync(true, EnumSet.noneOf(SyncFlag.class)); + } finally { + scope.close(); + } + } + + /** + * The expected semantics is all data have flushed out to all replicas + * and all replicas have done posix fsync equivalent - ie the OS has + * flushed it to the disk device (but the disk may have it in its cache). + * + * Note that only the current block is flushed to the disk device. + * To guarantee durable sync across block boundaries the stream should + * be created with {@link CreateFlag#SYNC_BLOCK}. + * + * @param syncFlags + * Indicate the semantic of the sync. Currently used to specify + * whether or not to update the block length in NameNode. + */ + public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { + TraceScope scope = + dfsClient.getPathTraceScope("hsync", src); + try { + flushOrSync(true, syncFlags); + } finally { + scope.close(); + } + } + + /** + * Flush/Sync buffered data to DataNodes. + * + * @param isSync + * Whether or not to require all replicas to flush data to the disk + * device + * @param syncFlags + * Indicate extra detailed semantic of the flush/sync. Currently + * mainly used to specify whether or not to update the file length in + * the NameNode + * @throws IOException + */ + private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags) + throws IOException { + dfsClient.checkOpen(); + checkClosed(); + try { + long toWaitFor; + long lastBlockLength = -1L; + boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH); + boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK); + synchronized (this) { + // flush checksum buffer, but keep checksum buffer intact if we do not + // need to end the current block + int numKept = flushBuffer(!endBlock, true); + // bytesCurBlock potentially incremented if there was buffered data + + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("DFSClient flush(): " + + " bytesCurBlock=" + getStreamer().getBytesCurBlock() + + " lastFlushOffset=" + lastFlushOffset + + " createNewBlock=" + endBlock); + } + // Flush only if we haven't already flushed till this offset. + if (lastFlushOffset != getStreamer().getBytesCurBlock()) { + assert getStreamer().getBytesCurBlock() > lastFlushOffset; + // record the valid offset of this flush + lastFlushOffset = getStreamer().getBytesCurBlock(); + if (isSync && currentPacket == null && !endBlock) { + // Nothing to send right now, + // but sync was requested. + // Send an empty packet if we do not end the block right now + currentPacket = createPacket(packetSize, chunksPerPacket, + getStreamer().getBytesCurBlock(), getStreamer() + .getAndIncCurrentSeqno(), false); + } + } else { + if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) { + // Nothing to send right now, + // and the block was partially written, + // and sync was requested. + // So send an empty sync packet if we do not end the block right + // now + currentPacket = createPacket(packetSize, chunksPerPacket, + getStreamer().getBytesCurBlock(), getStreamer() + .getAndIncCurrentSeqno(), false); + } else if (currentPacket != null) { + // just discard the current packet since it is already been sent. + currentPacket.releaseBuffer(byteArrayManager); + currentPacket = null; + } + } + if (currentPacket != null) { + currentPacket.setSyncBlock(isSync); + enqueueCurrentPacket(); + } + if (endBlock && getStreamer().getBytesCurBlock() > 0) { + // Need to end the current block, thus send an empty packet to + // indicate this is the end of the block and reset bytesCurBlock + currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), + getStreamer().getAndIncCurrentSeqno(), true); + currentPacket.setSyncBlock(shouldSyncBlock || isSync); + enqueueCurrentPacket(); + getStreamer().setBytesCurBlock(0); + lastFlushOffset = 0; + } else { + // Restore state of stream. Record the last flush offset + // of the last full chunk that was flushed. + getStreamer().setBytesCurBlock( + getStreamer().getBytesCurBlock() - numKept); + } + + toWaitFor = getStreamer().getLastQueuedSeqno(); + } // end synchronized + + getStreamer().waitForAckedSeqno(toWaitFor); + + // update the block length first time irrespective of flag + if (updateLength || getStreamer().getPersistBlocks().get()) { + synchronized (this) { + if (!getStreamer().streamerClosed() + && getStreamer().getBlock() != null) { + lastBlockLength = getStreamer().getBlock().getNumBytes(); + } + } + } + // If 1) any new blocks were allocated since the last flush, or 2) to + // update length in NN is required, then persist block locations on + // namenode. + if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) { + try { + dfsClient.namenode.fsync(src, fileId, dfsClient.clientName, + lastBlockLength); + } catch (IOException ioe) { + DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe); + // If we got an error here, it might be because some other thread called + // close before our hflush completed. In that case, we should throw an + // exception that the stream is closed. + checkClosed(); + // If we aren't closed but failed to sync, we should expose that to the + // caller. + throw ioe; + } + } + + synchronized(this) { + if (!getStreamer().streamerClosed()) { + getStreamer().setHflush(); + } + } + } catch (InterruptedIOException interrupt) { + // This kind of error doesn't mean that the stream itself is broken - just the + // flushing thread got interrupted. So, we shouldn't close down the writer, + // but instead just propagate the error + throw interrupt; + } catch (IOException e) { + DFSClient.LOG.warn("Error while syncing", e); + synchronized (this) { + if (!isClosed()) { + getStreamer().getLastException().set(e); + closeThreads(true); + } + } + throw e; + } + } + + /** + * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}. + */ + @Deprecated + public synchronized int getNumCurrentReplicas() throws IOException { + return getCurrentBlockReplication(); + } + + /** + * Note that this is not a public API; + * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead. + * + * @return the number of valid replicas of the current block + */ + public synchronized int getCurrentBlockReplication() throws IOException { + dfsClient.checkOpen(); + checkClosed(); + if (getStreamer().streamerClosed()) { + return blockReplication; // no pipeline, return repl factor of file + } + DatanodeInfo[] currentNodes = getStreamer().getNodes(); + if (currentNodes == null) { + return blockReplication; // no pipeline, return repl factor of file + } + return currentNodes.length; + } + + /** + * Waits till all existing data is flushed and confirmations + * received from datanodes. + */ + protected void flushInternal() throws IOException { + long toWaitFor; + synchronized (this) { + dfsClient.checkOpen(); + checkClosed(); + // + // If there is data in the current buffer, send it across + // + getStreamer().queuePacket(currentPacket); + currentPacket = null; + toWaitFor = getStreamer().getLastQueuedSeqno(); + } + + getStreamer().waitForAckedSeqno(toWaitFor); + } + + protected synchronized void start() { + getStreamer().start(); + } + + /** + * Aborts this output stream and releases any system + * resources associated with this stream. + */ + synchronized void abort() throws IOException { + if (isClosed()) { + return; + } + getStreamer().getLastException().set(new IOException("Lease timeout of " + + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired.")); + closeThreads(true); + dfsClient.endFileLease(fileId); + } + + boolean isClosed() { + return closed || getStreamer().streamerClosed(); + } + + void setClosed() { + closed = true; + getStreamer().release(); + } + + // shutdown datastreamer and responseprocessor threads. + // interrupt datastreamer if force is true + protected void closeThreads(boolean force) throws IOException { + try { + getStreamer().close(force); + getStreamer().join(); + getStreamer().closeSocket(); + } catch (InterruptedException e) { + throw new IOException("Failed to shutdown streamer"); + } finally { + getStreamer().setSocketToNull(); + setClosed(); + } + } + + /** + * Closes this output stream and releases any system + * resources associated with this stream. + */ + @Override + public synchronized void close() throws IOException { + TraceScope scope = + dfsClient.getPathTraceScope("DFSOutputStream#close", src); + try { + closeImpl(); + } finally { + scope.close(); + } + } + + protected synchronized void closeImpl() throws IOException { + if (isClosed()) { + getStreamer().getLastException().check(true); + return; + } + + try { + flushBuffer(); // flush from all upper layers + + if (currentPacket != null) { + enqueueCurrentPacket(); + } + + if (getStreamer().getBytesCurBlock() != 0) { + setCurrentPacketToEmpty(); + } + + flushInternal(); // flush all data to Datanodes + // get last block before destroying the streamer + ExtendedBlock lastBlock = getStreamer().getBlock(); + closeThreads(false); + TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); + try { + completeFile(lastBlock); + } finally { + scope.close(); + } + dfsClient.endFileLease(fileId); + } catch (ClosedChannelException e) { + } finally { + setClosed(); + } + } + + // should be called holding (this) lock since setTestFilename() may + // be called during unit tests + protected void completeFile(ExtendedBlock last) throws IOException { + long localstart = Time.monotonicNow(); + final DfsClientConf conf = dfsClient.getConf(); + long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); + boolean fileComplete = false; + int retries = conf.getNumBlockWriteLocateFollowingRetry(); + while (!fileComplete) { + fileComplete = + dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId); + if (!fileComplete) { + final int hdfsTimeout = conf.getHdfsTimeout(); + if (!dfsClient.clientRunning + || (hdfsTimeout > 0 + && localstart + hdfsTimeout < Time.monotonicNow())) { + String msg = "Unable to close file because dfsclient " + + " was unable to contact the HDFS servers." + + " clientRunning " + dfsClient.clientRunning + + " hdfsTimeout " + hdfsTimeout; + DFSClient.LOG.info(msg); + throw new IOException(msg); + } + try { + if (retries == 0) { + throw new IOException("Unable to close file because the last block" + + " does not have enough number of replicas."); + } + retries--; + Thread.sleep(sleeptime); + sleeptime *= 2; + if (Time.monotonicNow() - localstart > 5000) { + DFSClient.LOG.info("Could not complete " + src + " retrying..."); + } + } catch (InterruptedException ie) { + DFSClient.LOG.warn("Caught exception ", ie); + } + } + } + } + + @VisibleForTesting + public void setArtificialSlowdown(long period) { + getStreamer().setArtificialSlowdown(period); + } + + @VisibleForTesting + public synchronized void setChunksPerPacket(int value) { + chunksPerPacket = Math.min(chunksPerPacket, value); + packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket; + } + + /** + * Returns the size of a file as it was when this stream was opened + */ + public long getInitialLen() { + return initialFileSize; + } + + /** + * @return the FileEncryptionInfo for this stream, or null if not encrypted. + */ + public FileEncryptionInfo getFileEncryptionInfo() { + return fileEncryptionInfo; + } + + /** + * Returns the access token currently used by streamer, for testing only + */ + synchronized Token<BlockTokenIdentifier> getBlockToken() { + return getStreamer().getBlockToken(); + } + + @Override + public void setDropBehind(Boolean dropBehind) throws IOException { + CachingStrategy prevStrategy, nextStrategy; + // CachingStrategy is immutable. So build a new CachingStrategy with the + // modifications we want, and compare-and-swap it in. + do { + prevStrategy = this.cachingStrategy.get(); + nextStrategy = new CachingStrategy.Builder(prevStrategy). + setDropBehind(dropBehind).build(); + } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy)); + } + + @VisibleForTesting + ExtendedBlock getBlock() { + return getStreamer().getBlock(); + } + + @VisibleForTesting + public long getFileId() { + return fileId; + } + + /** + * Return the source of stream. + */ + String getSrc() { + return src; + } + + /** + * Returns the data streamer object. + */ + protected DataStreamer getStreamer() { + return streamer; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java new file mode 100755 index 0000000..22055c3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.channels.ClosedChannelException; +import java.util.Arrays; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.util.ByteArrayManager; +import org.apache.htrace.Span; + +/**************************************************************** + * DFSPacket is used by DataStreamer and DFSOutputStream. + * DFSOutputStream generates packets and then ask DatStreamer + * to send them to datanodes. + ****************************************************************/ + +@InterfaceAudience.Private +class DFSPacket { + public static final long HEART_BEAT_SEQNO = -1L; + private static long[] EMPTY = new long[0]; + private final long seqno; // sequence number of buffer in block + private final long offsetInBlock; // offset in block + private boolean syncBlock; // this packet forces the current block to disk + private int numChunks; // number of chunks currently in packet + private final int maxChunks; // max chunks in packet + private byte[] buf; + private final boolean lastPacketInBlock; // is this the last packet in block? + + /** + * buf is pointed into like follows: + * (C is checksum data, D is payload data) + * + * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___] + * ^ ^ ^ ^ + * | checksumPos dataStart dataPos + * checksumStart + * + * Right before sending, we move the checksum data to immediately precede + * the actual data, and then insert the header into the buffer immediately + * preceding the checksum data, so we make sure to keep enough space in + * front of the checksum data to support the largest conceivable header. + */ + private int checksumStart; + private int checksumPos; + private final int dataStart; + private int dataPos; + private long[] traceParents = EMPTY; + private int traceParentsUsed; + private Span span; + + /** + * Create a new packet. + * + * @param buf the buffer storing data and checksums + * @param chunksPerPkt maximum number of chunks per packet. + * @param offsetInBlock offset in bytes into the HDFS block. + * @param seqno the sequence number of this packet + * @param checksumSize the size of checksum + * @param lastPacketInBlock if this is the last packet + */ + DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, + int checksumSize, boolean lastPacketInBlock) { + this.lastPacketInBlock = lastPacketInBlock; + this.numChunks = 0; + this.offsetInBlock = offsetInBlock; + this.seqno = seqno; + + this.buf = buf; + + checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; + checksumPos = checksumStart; + dataStart = checksumStart + (chunksPerPkt * checksumSize); + dataPos = dataStart; + maxChunks = chunksPerPkt; + } + + /** + * Write data to this packet. + * + * @param inarray input array of data + * @param off the offset of data to write + * @param len the length of data to write + * @throws ClosedChannelException + */ + synchronized void writeData(byte[] inarray, int off, int len) + throws ClosedChannelException { + checkBuffer(); + if (dataPos + len > buf.length) { + throw new BufferOverflowException(); + } + System.arraycopy(inarray, off, buf, dataPos, len); + dataPos += len; + } + + /** + * Write checksums to this packet + * + * @param inarray input array of checksums + * @param off the offset of checksums to write + * @param len the length of checksums to write + * @throws ClosedChannelException + */ + synchronized void writeChecksum(byte[] inarray, int off, int len) + throws ClosedChannelException { + checkBuffer(); + if (len == 0) { + return; + } + if (checksumPos + len > dataStart) { + throw new BufferOverflowException(); + } + System.arraycopy(inarray, off, buf, checksumPos, len); + checksumPos += len; + } + + /** + * Write the full packet, including the header, to the given output stream. + * + * @param stm + * @throws IOException + */ + synchronized void writeTo(DataOutputStream stm) throws IOException { + checkBuffer(); + + final int dataLen = dataPos - dataStart; + final int checksumLen = checksumPos - checksumStart; + final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; + + PacketHeader header = new PacketHeader( + pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); + + if (checksumPos != dataStart) { + // Move the checksum to cover the gap. This can happen for the last + // packet or during an hflush/hsync call. + System.arraycopy(buf, checksumStart, buf, + dataStart - checksumLen , checksumLen); + checksumPos = dataStart; + checksumStart = checksumPos - checksumLen; + } + + final int headerStart = checksumStart - header.getSerializedSize(); + assert checksumStart + 1 >= header.getSerializedSize(); + assert headerStart >= 0; + assert headerStart + header.getSerializedSize() == checksumStart; + + // Copy the header data into the buffer immediately preceding the checksum + // data. + System.arraycopy(header.getBytes(), 0, buf, headerStart, + header.getSerializedSize()); + + // corrupt the data for testing. + if (DFSClientFaultInjector.get().corruptPacket()) { + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + } + + // Write the now contiguous full packet to the output stream. + stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen); + + // undo corruption. + if (DFSClientFaultInjector.get().uncorruptPacket()) { + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + } + } + + private synchronized void checkBuffer() throws ClosedChannelException { + if (buf == null) { + throw new ClosedChannelException(); + } + } + + /** + * Release the buffer in this packet to ByteArrayManager. + * + * @param bam + */ + synchronized void releaseBuffer(ByteArrayManager bam) { + bam.release(buf); + buf = null; + } + + /** + * get the packet's last byte's offset in the block + * + * @return the packet's last byte's offset in the block + */ + synchronized long getLastByteOffsetBlock() { + return offsetInBlock + dataPos - dataStart; + } + + /** + * Check if this packet is a heart beat packet + * + * @return true if the sequence number is HEART_BEAT_SEQNO + */ + boolean isHeartbeatPacket() { + return seqno == HEART_BEAT_SEQNO; + } + + /** + * check if this packet is the last packet in block + * + * @return true if the packet is the last packet + */ + boolean isLastPacketInBlock(){ + return lastPacketInBlock; + } + + /** + * get sequence number of this packet + * + * @return the sequence number of this packet + */ + long getSeqno(){ + return seqno; + } + + /** + * get the number of chunks this packet contains + * + * @return the number of chunks in this packet + */ + synchronized int getNumChunks(){ + return numChunks; + } + + /** + * increase the number of chunks by one + */ + synchronized void incNumChunks(){ + numChunks++; + } + + /** + * get the maximum number of packets + * + * @return the maximum number of packets + */ + int getMaxChunks(){ + return maxChunks; + } + + /** + * set if to sync block + * + * @param syncBlock if to sync block + */ + synchronized void setSyncBlock(boolean syncBlock){ + this.syncBlock = syncBlock; + } + + @Override + public String toString() { + return "packet seqno: " + this.seqno + + " offsetInBlock: " + this.offsetInBlock + + " lastPacketInBlock: " + this.lastPacketInBlock + + " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock(); + } + + /** + * Add a trace parent span for this packet.<p/> + * + * Trace parent spans for a packet are the trace spans responsible for + * adding data to that packet. We store them as an array of longs for + * efficiency.<p/> + * + * Protected by the DFSOutputStream dataQueue lock. + */ + public void addTraceParent(Span span) { + if (span == null) { + return; + } + addTraceParent(span.getSpanId()); + } + + public void addTraceParent(long id) { + if (traceParentsUsed == traceParents.length) { + int newLength = (traceParents.length == 0) ? 8 : + traceParents.length * 2; + traceParents = Arrays.copyOf(traceParents, newLength); + } + traceParents[traceParentsUsed] = id; + traceParentsUsed++; + } + + /** + * Get the trace parent spans for this packet.<p/> + * + * Will always be non-null.<p/> + * + * Protected by the DFSOutputStream dataQueue lock. + */ + public long[] getTraceParents() { + // Remove duplicates from the array. + int len = traceParentsUsed; + Arrays.sort(traceParents, 0, len); + int i = 0, j = 0; + long prevVal = 0; // 0 is not a valid span id + while (true) { + if (i == len) { + break; + } + long val = traceParents[i]; + if (val != prevVal) { + traceParents[j] = val; + j++; + prevVal = val; + } + i++; + } + if (j < traceParents.length) { + traceParents = Arrays.copyOf(traceParents, j); + traceParentsUsed = traceParents.length; + } + return traceParents; + } + + public void setTraceSpan(Span span) { + this.span = span; + } + + public Span getTraceSpan() { + return span; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 359886e..e275afb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -590,6 +591,29 @@ public class DFSUtilClient { } } + public static int getIoFileBufferSize(Configuration conf) { + return conf.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + } + + public static int getSmallBufferSize(Configuration conf) { + return Math.min(getIoFileBufferSize(conf) / 2, 512); + } + + /** + * Probe for HDFS Encryption being enabled; this uses the value of + * the option {@link HdfsClientConfigKeys#DFS_ENCRYPTION_KEY_PROVIDER_URI}, + * returning true if that property contains a non-empty, non-whitespace + * string. + * @param conf configuration to probe + * @return true if encryption is considered enabled. + */ + public static boolean isHDFSEncryptionEnabled(Configuration conf) { + return !conf.getTrimmed( + HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty(); + } + public static InetSocketAddress getNNAddress(String address) { return NetUtils.createSocketAddr(address, HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);