Repository: hadoop Updated Branches: refs/heads/branch-2 00dec84cc -> 954dae26c
http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java index a66847a..42f0c58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -20,15 +20,15 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; import java.io.FileDescriptor; import java.io.FileOutputStream; +import java.io.Flushable; import java.io.OutputStream; import java.io.IOException; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.Time; import org.slf4j.Logger; /** @@ -43,21 +43,22 @@ public class ReplicaOutputStreams implements Closeable { /** Stream to checksum. */ private final OutputStream checksumOut; private final DataChecksum checksum; - private final boolean isTransientStorage; - private final long slowLogThresholdMs; + private final FsVolumeSpi volume; + private final FileIoProvider fileIoProvider; /** * Create an object with a data output stream, a checksum output stream * and a checksum. */ - public ReplicaOutputStreams(OutputStream dataOut, - OutputStream checksumOut, DataChecksum checksum, - boolean isTransientStorage, long slowLogThresholdMs) { + public ReplicaOutputStreams( + OutputStream dataOut, OutputStream checksumOut, DataChecksum checksum, + FsVolumeSpi volume, FileIoProvider fileIoProvider) { + this.dataOut = dataOut; this.checksum = checksum; - this.slowLogThresholdMs = slowLogThresholdMs; - this.isTransientStorage = isTransientStorage; this.checksumOut = checksumOut; + this.volume = volume; + this.fileIoProvider = fileIoProvider; try { if (this.dataOut instanceof FileOutputStream) { @@ -93,7 +94,7 @@ public class ReplicaOutputStreams implements Closeable { /** @return is writing to a transient storage? */ public boolean isTransientStorage() { - return isTransientStorage; + return volume.isTransientStorage(); } @Override @@ -112,7 +113,7 @@ public class ReplicaOutputStreams implements Closeable { */ public void syncDataOut() throws IOException { if (dataOut instanceof FileOutputStream) { - sync((FileOutputStream)dataOut); + fileIoProvider.sync(volume, (FileOutputStream) dataOut); } } @@ -121,7 +122,7 @@ public class ReplicaOutputStreams implements Closeable { */ public void syncChecksumOut() throws IOException { if (checksumOut instanceof FileOutputStream) { - sync((FileOutputStream)checksumOut); + fileIoProvider.sync(volume, (FileOutputStream) checksumOut); } } @@ -129,60 +130,34 @@ public class ReplicaOutputStreams implements Closeable { * Flush the data stream if it supports it. */ public void flushDataOut() throws IOException { - flush(dataOut); + if (dataOut != null) { + fileIoProvider.flush(volume, dataOut); + } } /** * Flush the checksum stream if it supports it. */ public void flushChecksumOut() throws IOException { - flush(checksumOut); - } - - private void flush(OutputStream dos) throws IOException { - long begin = Time.monotonicNow(); - dos.flush(); - long duration = Time.monotonicNow() - begin; - LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow flush took {} ms (threshold={} ms)", duration, - slowLogThresholdMs); + if (checksumOut != null) { + fileIoProvider.flush(volume, checksumOut); } } - private void sync(FileOutputStream fos) throws IOException { - long begin = Time.monotonicNow(); - fos.getChannel().force(true); - long duration = Time.monotonicNow() - begin; - LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration, - slowLogThresholdMs); - } - } - - public long writeToDisk(byte[] b, int off, int len) throws IOException { - long begin = Time.monotonicNow(); + public void writeDataToDisk(byte[] b, int off, int len) + throws IOException { dataOut.write(b, off, len); - long duration = Time.monotonicNow() - begin; - LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " + - "(threshold={} ms)", duration, slowLogThresholdMs); - } - return duration; } public void syncFileRangeIfPossible(long offset, long nbytes, int flags) throws NativeIOException { - assert this.outFd != null : "null outFd!"; - NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags); + fileIoProvider.syncFileRange( + volume, outFd, offset, nbytes, flags); } public void dropCacheBehindWrites(String identifier, long offset, long len, int flags) throws NativeIOException { - assert this.outFd != null : "null outFd!"; - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - identifier, outFd, offset, len, flags); + fileIoProvider.posixFadvise( + volume, identifier, outFd, offset, len, flags); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 5b701b5..cd44557 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -32,19 +32,18 @@ import java.util.Iterator; import java.util.Scanner; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CachingGetSpaceUsed; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.GetSpaceUsed; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; @@ -63,7 +62,6 @@ import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.Timer; import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.Files; /** * A block pool slice represents a portion of a block pool stored on a volume. @@ -95,6 +93,7 @@ class BlockPoolSlice { private final long cachedDfsUsedCheckTime; private final Timer timer; private final int maxDataLength; + private final FileIoProvider fileIoProvider; // TODO:FEDERATION scalability issue - a thread per DU is needed private final GetSpaceUsed dfsUsage; @@ -112,6 +111,7 @@ class BlockPoolSlice { Configuration conf, Timer timer) throws IOException { this.bpid = bpid; this.volume = volume; + this.fileIoProvider = volume.getFileIoProvider(); this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); this.finalizedDir = new File( currentDir, DataStorage.STORAGE_DIR_FINALIZED); @@ -146,25 +146,20 @@ class BlockPoolSlice { // this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); if (tmpDir.exists()) { - DataStorage.fullyDelete(tmpDir); + fileIoProvider.fullyDelete(volume, tmpDir); } this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); final boolean supportAppends = conf.getBoolean( DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT); if (rbwDir.exists() && !supportAppends) { - FileUtil.fullyDelete(rbwDir); - } - if (!rbwDir.mkdirs()) { // create rbw directory if not exist - if (!rbwDir.isDirectory()) { - throw new IOException("Mkdirs failed to create " + rbwDir.toString()); - } - } - if (!tmpDir.mkdirs()) { - if (!tmpDir.isDirectory()) { - throw new IOException("Mkdirs failed to create " + tmpDir.toString()); - } + fileIoProvider.fullyDelete(volume, rbwDir); } + + // create the rbw and tmp directories if they don't exist. + fileIoProvider.mkdirs(volume, rbwDir); + fileIoProvider.mkdirs(volume, tmpDir); + // Use cached value initially if available. Or the following call will // block until the initial du command completes. this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir) @@ -271,7 +266,7 @@ class BlockPoolSlice { */ void saveDfsUsed() { File outFile = new File(currentDir, DU_CACHE_FILE); - if (outFile.exists() && !outFile.delete()) { + if (!fileIoProvider.deleteWithExistsCheck(volume, outFile)) { FsDatasetImpl.LOG.warn("Failed to delete old dfsUsed file in " + outFile.getParent()); } @@ -279,10 +274,10 @@ class BlockPoolSlice { try { long used = getDfsUsed(); try (Writer out = new OutputStreamWriter( - new FileOutputStream(outFile), "UTF-8")) { + fileIoProvider.getFileOutputStream(volume, outFile), "UTF-8")) { // mtime is written last, so that truncated writes won't be valid. out.write(Long.toString(used) + " " + Long.toString(timer.now())); - out.flush(); + fileIoProvider.flush(volume, out); } } catch (IOException ioe) { // If write failed, the volume might be bad. Since the cache file is @@ -297,7 +292,8 @@ class BlockPoolSlice { */ File createTmpFile(Block b) throws IOException { File f = new File(tmpDir, b.getBlockName()); - File tmpFile = DatanodeUtil.createTmpFile(b, f); + File tmpFile = DatanodeUtil.createFileWithExistsCheck( + volume, b, f, fileIoProvider); // If any exception during creation, its expected that counter will not be // incremented, So no need to decrement incrNumBlocks(); @@ -310,7 +306,8 @@ class BlockPoolSlice { */ File createRbwFile(Block b) throws IOException { File f = new File(rbwDir, b.getBlockName()); - File rbwFile = DatanodeUtil.createTmpFile(b, f); + File rbwFile = DatanodeUtil.createFileWithExistsCheck( + volume, b, f, fileIoProvider); // If any exception during creation, its expected that counter will not be // incremented, So no need to decrement incrNumBlocks(); @@ -319,12 +316,9 @@ class BlockPoolSlice { File addBlock(Block b, File f) throws IOException { File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); - if (!blockDir.exists()) { - if (!blockDir.mkdirs()) { - throw new IOException("Failed to mkdirs " + blockDir); - } - } - File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir); + fileIoProvider.mkdirsWithExistsCheck(volume, blockDir); + File blockFile = ((FsDatasetImpl) volume.getDataset()).moveBlockFiles( + volume, b, f, blockDir); File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); if (dfsUsage instanceof CachingGetSpaceUsed) { ((CachingGetSpaceUsed) dfsUsage).incDfsUsed( @@ -342,9 +336,9 @@ class BlockPoolSlice { final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); final File targetBlockFile = new File(blockDir, blockFile.getName()); final File targetMetaFile = new File(blockDir, metaFile.getName()); - FileUtils.moveFile(blockFile, targetBlockFile); + fileIoProvider.moveFile(volume, blockFile, targetBlockFile); FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile); - FileUtils.moveFile(metaFile, targetMetaFile); + fileIoProvider.moveFile(volume, metaFile, targetMetaFile); FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile); return targetBlockFile; } @@ -387,16 +381,13 @@ class BlockPoolSlice { File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp); if (blockFile.exists()) { // If the original block file still exists, then no recovery is needed. - if (!unlinkedTmp.delete()) { + if (!fileIoProvider.delete(volume, unlinkedTmp)) { throw new IOException("Unable to cleanup unlinked tmp file " + unlinkedTmp); } return null; } else { - if (!unlinkedTmp.renameTo(blockFile)) { - throw new IOException("Unable to rename unlinked tmp file " + - unlinkedTmp); - } + fileIoProvider.rename(volume, unlinkedTmp, blockFile); return blockFile; } } @@ -409,7 +400,7 @@ class BlockPoolSlice { */ private int moveLazyPersistReplicasToFinalized(File source) throws IOException { - File files[] = FileUtil.listFiles(source); + File[] files = fileIoProvider.listFiles(volume, source); int numRecovered = 0; for (File file : files) { if (file.isDirectory()) { @@ -424,24 +415,25 @@ class BlockPoolSlice { if (blockFile.exists()) { - if (!targetDir.exists() && !targetDir.mkdirs()) { + try { + fileIoProvider.mkdirsWithExistsCheck(volume, targetDir); + } catch(IOException ioe) { LOG.warn("Failed to mkdirs " + targetDir); continue; } final File targetMetaFile = new File(targetDir, metaFile.getName()); try { - ReplicaInfo.rename(metaFile, targetMetaFile); + fileIoProvider.rename(volume, metaFile, targetMetaFile); } catch (IOException e) { LOG.warn("Failed to move meta file from " + metaFile + " to " + targetMetaFile, e); continue; - } final File targetBlockFile = new File(targetDir, blockFile.getName()); try { - ReplicaInfo.rename(blockFile, targetBlockFile); + fileIoProvider.rename(volume, blockFile, targetBlockFile); } catch (IOException e) { LOG.warn("Failed to move block file from " + blockFile + " to " + targetBlockFile, e); @@ -458,7 +450,7 @@ class BlockPoolSlice { } } - FileUtil.fullyDelete(source); + fileIoProvider.fullyDelete(volume, source); return numRecovered; } @@ -491,7 +483,7 @@ class BlockPoolSlice { loadRwr = false; } sc.close(); - if (!restartMeta.delete()) { + if (!fileIoProvider.delete(volume, restartMeta)) { FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " + restartMeta.getPath()); } @@ -547,7 +539,7 @@ class BlockPoolSlice { final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized) throws IOException { - File files[] = FileUtil.listFiles(dir); + File[] files = fileIoProvider.listFiles(volume, dir); for (File file : files) { if (file.isDirectory()) { addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized); @@ -560,8 +552,9 @@ class BlockPoolSlice { continue; } } - if (!Block.isBlockFilename(file)) + if (!Block.isBlockFilename(file)) { continue; + } long genStamp = FsDatasetUtil.getGenerationStampFromFile( files, file); @@ -650,11 +643,11 @@ class BlockPoolSlice { private void deleteReplica(final ReplicaInfo replicaToDelete) { // Delete the files on disk. Failure here is okay. final File blockFile = replicaToDelete.getBlockFile(); - if (!blockFile.delete()) { + if (!fileIoProvider.delete(volume, blockFile)) { LOG.warn("Failed to delete block file " + blockFile); } final File metaFile = replicaToDelete.getMetaFile(); - if (!metaFile.delete()) { + if (!fileIoProvider.delete(volume, metaFile)) { LOG.warn("Failed to delete meta file " + metaFile); } } @@ -681,7 +674,8 @@ class BlockPoolSlice { return 0; } try (DataInputStream checksumIn = new DataInputStream( - new BufferedInputStream(new FileInputStream(metaFile), + new BufferedInputStream( + fileIoProvider.getFileInputStream(volume, metaFile), ioFileBufferSize))) { // read and handle the common header here. For now just a version final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( @@ -694,9 +688,10 @@ class BlockPoolSlice { if (numChunks == 0) { return 0; } - try (InputStream blockIn = new FileInputStream(blockFile); + try (InputStream blockIn = fileIoProvider.getFileInputStream( + volume, blockFile); ReplicaInputStreams ris = new ReplicaInputStreams(blockIn, - checksumIn, volume.obtainReference())) { + checksumIn, volume.obtainReference(), fileIoProvider)) { ris.skipChecksumFully((numChunks - 1) * checksumSize); long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum; ris.skipDataFully(lastChunkStartPos); @@ -715,7 +710,8 @@ class BlockPoolSlice { // truncate if extra bytes are present without CRC if (blockFile.length() > validFileLength) { try (RandomAccessFile blockRAF = - new RandomAccessFile(blockFile, "rw")) { + fileIoProvider.getRandomAccessFile( + volume, blockFile, "rw")) { // truncate blockFile blockRAF.setLength(validFileLength); } @@ -767,12 +763,14 @@ class BlockPoolSlice { } FileInputStream inputStream = null; try { - inputStream = new FileInputStream(replicaFile); + inputStream = fileIoProvider.getFileInputStream(volume, replicaFile); BlockListAsLongs blocksList = BlockListAsLongs.readFrom(inputStream, maxDataLength); - Iterator<BlockReportReplica> iterator = blocksList.iterator(); - while (iterator.hasNext()) { - BlockReportReplica replica = iterator.next(); + if (blocksList == null) { + return false; + } + + for (BlockReportReplica replica : blocksList) { switch (replica.getState()) { case FINALIZED: addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true); @@ -809,7 +807,7 @@ class BlockPoolSlice { return false; } finally { - if (!replicaFile.delete()) { + if (!fileIoProvider.delete(volume, replicaFile)) { LOG.info("Failed to delete replica cache file: " + replicaFile.getPath()); } @@ -823,41 +821,29 @@ class BlockPoolSlice { blocksListToPersist.getNumberOfBlocks()== 0) { return; } - File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp"); - if (tmpFile.exists() && !tmpFile.delete()) { - LOG.warn("Failed to delete tmp replicas file in " + - tmpFile.getPath()); - return; - } - File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE); - if (replicaCacheFile.exists() && !replicaCacheFile.delete()) { - LOG.warn("Failed to delete replicas file in " + - replicaCacheFile.getPath()); + final File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp"); + final File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE); + if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) || + !fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) { return; } FileOutputStream out = null; try { - out = new FileOutputStream(tmpFile); + out = fileIoProvider.getFileOutputStream(volume, tmpFile); blocksListToPersist.writeTo(out); out.close(); // Renaming the tmp file to replicas - Files.move(tmpFile, replicaCacheFile); + fileIoProvider.moveFile(volume, tmpFile, replicaCacheFile); } catch (Exception e) { // If write failed, the volume might be bad. Since the cache file is // not critical, log the error, delete both the files (tmp and cache) // and continue. LOG.warn("Failed to write replicas to cache ", e); - if (replicaCacheFile.exists() && !replicaCacheFile.delete()) { - LOG.warn("Failed to delete replicas file: " + - replicaCacheFile.getPath()); - } + fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile); } finally { IOUtils.closeStream(out); - if (tmpFile.exists() && !tmpFile.delete()) { - LOG.warn("Failed to delete tmp file in " + - tmpFile.getPath()); - } + fileIoProvider.deleteWithExistsCheck(volume, tmpFile); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 36d90fd..564e1a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -33,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; @@ -232,6 +234,7 @@ class FsDatasetAsyncDiskService { final File metaFile; final ExtendedBlock block; final String trashDirectory; + private final FileIoProvider fileIoProvider; ReplicaFileDeleteTask(FsVolumeReference volumeRef, File blockFile, File metaFile, ExtendedBlock block, String trashDirectory) { @@ -241,6 +244,7 @@ class FsDatasetAsyncDiskService { this.metaFile = metaFile; this.block = block; this.trashDirectory = trashDirectory; + this.fileIoProvider = volume.getFileIoProvider(); } @Override @@ -252,13 +256,17 @@ class FsDatasetAsyncDiskService { } private boolean deleteFiles() { - return blockFile.delete() && (metaFile.delete() || !metaFile.exists()); + return fileIoProvider.delete(volume, blockFile) && + (fileIoProvider.delete(volume, metaFile) || + !fileIoProvider.exists(volume, metaFile)); } private boolean moveFiles() { File trashDirFile = new File(trashDirectory); - if (!trashDirFile.exists() && !trashDirFile.mkdirs()) { - LOG.error("Failed to create trash directory " + trashDirectory); + try { + fileIoProvider.mkdirsWithExistsCheck( + volume, trashDirFile); + } catch (IOException e) { return false; } @@ -269,8 +277,14 @@ class FsDatasetAsyncDiskService { File newBlockFile = new File(trashDirectory, blockFile.getName()); File newMetaFile = new File(trashDirectory, metaFile.getName()); - return (blockFile.renameTo(newBlockFile) && - metaFile.renameTo(newMetaFile)); + + try { + fileIoProvider.renameTo(volume, blockFile, newBlockFile); + fileIoProvider.renameTo(volume, metaFile, newMetaFile); + return true; + } catch(IOException ioe) { + return false; + } } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 7cee8f4..c821989 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -17,38 +17,10 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import java.io.BufferedOutputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.FileChannel; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.*; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.TimeUnit; - -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; -import javax.management.StandardMBean; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -57,13 +29,12 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -74,11 +45,10 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; -import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; @@ -101,6 +71,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -115,6 +86,7 @@ import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -124,9 +96,34 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Timer; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; /************************************************** * FSDataset manages a set of data blocks. Each block @@ -195,10 +192,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { public Block getStoredBlock(String bpid, long blkid) throws IOException { try(AutoCloseableLock lock = datasetLock.acquire()) { - File blockfile = getFile(bpid, blkid, false); + File blockfile = null; + + ReplicaInfo info = volumeMap.get(bpid, blkid); + if (info != null) { + blockfile = info.getBlockFile(); + } if (blockfile == null) { return null; } + final File metafile = FsDatasetUtil.findMetaFile(blockfile); final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile); return new Block(blkid, blockfile.length(), gs); @@ -233,17 +236,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { public LengthInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { File meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp()); + FsVolumeSpi volume = null; + if (meta == null || !meta.exists()) { return null; } + + try (AutoCloseableLock lock = datasetLock.acquire()) { + final ReplicaInfo replicaInfo = getReplicaInfo(b); + if (replicaInfo != null) { + volume = replicaInfo.getVolume(); + } + } + if (isNativeIOAvailable) { return new LengthInputStream( - NativeIO.getShareDeleteFileInputStream(meta), + datanode.getFileIoProvider().getShareDeleteFileInputStream( + volume, meta, 0), meta.length()); } - return new LengthInputStream(new FileInputStream(meta), meta.length()); + return new LengthInputStream( + datanode.getFileIoProvider().getFileInputStream(volume, meta), + meta.length()); } - + final DataNode datanode; final DataStorage dataStorage; private final FsVolumeList volumes; @@ -758,42 +774,49 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { return f; } - /** - * Return the File associated with a block, without first - * checking that it exists. This should be used when the - * next operation is going to open the file for read anyway, - * and thus the exists check is redundant. - * - * @param touch if true then update the last access timestamp of the - * block. Currently used for blocks on transient storage. - */ - private File getBlockFileNoExistsCheck(ExtendedBlock b, - boolean touch) - throws IOException { - final File f; - try(AutoCloseableLock lock = datasetLock.acquire()) { - f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch); + @Override // FsDatasetSpi + public InputStream getBlockInputStream(ExtendedBlock b, + long seekOffset) throws IOException { + ReplicaInfo info; + try (AutoCloseableLock lock = datasetLock.acquire()) { + info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); } - if (f == null) { - throw new IOException("Block " + b + " is not valid"); + + final File blockFile = info != null ? info.getBlockFile() : null; + + if (blockFile != null && info.getVolume().isTransientStorage()) { + ramDiskReplicaTracker.touch(b.getBlockPoolId(), b.getBlockId()); + datanode.getMetrics().incrRamDiskBlocksReadHits(); + } + + if(blockFile != null && + datanode.getFileIoProvider().exists( + info.getVolume(), blockFile)) { + return getDataInputStream(info, seekOffset); + } else { + throw new IOException("Block " + b + " is not valid. " + + "Expected block file at " + blockFile + " does not exist."); } - return f; } - @Override // FsDatasetSpi - public InputStream getBlockInputStream(ExtendedBlock b, - long seekOffset) throws IOException { - File blockFile = getBlockFileNoExistsCheck(b, true); - if (isNativeIOAvailable) { - return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset); + private InputStream getDataInputStream( + ReplicaInfo info, long seekOffset) throws IOException { + FileInputStream fis; + final File blockFile = info.getBlockFile(); + final FileIoProvider fileIoProvider = datanode.getFileIoProvider(); + if (NativeIO.isAvailable()) { + fis = fileIoProvider.getShareDeleteFileInputStream( + info.getVolume(), blockFile, seekOffset); } else { try { - return openAndSeek(blockFile, seekOffset); + fis = fileIoProvider.openAndSeek( + info.getVolume(), blockFile, seekOffset); } catch (FileNotFoundException fnfe) { - throw new IOException("Block " + b + " is not valid. " + - "Expected block file at " + blockFile + " does not exist."); + throw new IOException("Expected block file at " + blockFile + + " does not exist."); } } + return fis; } /** @@ -846,53 +869,40 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { try(AutoCloseableLock lock = datasetLock.acquire()) { - ReplicaInfo info = getReplicaInfo(b); + final ReplicaInfo info = getReplicaInfo(b); + final FileIoProvider fileIoProvider = datanode.getFileIoProvider(); FsVolumeReference ref = info.getVolume().obtainReference(); + InputStream blockInStream = null; + InputStream metaInStream = null; try { - InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset); - try { - InputStream metaInStream = - openAndSeek(info.getMetaFile(), metaOffset); - return new ReplicaInputStreams(blockInStream, metaInStream, ref); - } catch (IOException e) { - IOUtils.cleanup(null, blockInStream); - throw e; - } + blockInStream = fileIoProvider.openAndSeek( + info.getVolume(), info.getBlockFile(), blkOffset); + metaInStream = fileIoProvider.openAndSeek( + info.getVolume(), info.getMetaFile(), metaOffset); + return new ReplicaInputStreams( + blockInStream, metaInStream, ref, fileIoProvider); } catch (IOException e) { - IOUtils.cleanup(null, ref); + IOUtils.cleanup(null, ref, blockInStream); throw e; } } } - private static FileInputStream openAndSeek(File file, long offset) - throws IOException { - RandomAccessFile raf = null; - try { - raf = new RandomAccessFile(file, "r"); - if (offset > 0) { - raf.seek(offset); - } - return new FileInputStream(raf.getFD()); - } catch(IOException ioe) { - IOUtils.cleanup(null, raf); - throw ioe; - } - } - - static File moveBlockFiles(Block b, File srcfile, File destdir) - throws IOException { + File moveBlockFiles( + FsVolumeSpi volume, Block b, File srcfile, + File destdir) throws IOException { final File dstfile = new File(destdir, b.getBlockName()); final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp()); final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp()); + final FileIoProvider fileIoProvider = datanode.getFileIoProvider(); try { - NativeIO.renameTo(srcmeta, dstmeta); + fileIoProvider.renameTo(volume, srcmeta, dstmeta); } catch (IOException e) { throw new IOException("Failed to move meta file for " + b + " from " + srcmeta + " to " + dstmeta, e); } try { - NativeIO.renameTo(srcfile, dstfile); + fileIoProvider.renameTo(volume, srcfile, dstfile); } catch (IOException e) { throw new IOException("Failed to move block file for " + b + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e); @@ -1031,8 +1041,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { static void computeChecksum(File srcMeta, File dstMeta, File blockFile, int smallBufferSize, final Configuration conf) throws IOException { - final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta, - DFSUtilClient.getIoFileBufferSize(conf)); + DataChecksum checksum; + + try (FileInputStream fis = new FileInputStream(srcMeta)) { + checksum = BlockMetadataHeader.readDataChecksum(fis, + DFSUtilClient.getIoFileBufferSize(conf), srcMeta); + } + final byte[] data = new byte[1 << 16]; final byte[] crcs = new byte[checksum.getChecksumSize(data.length)]; @@ -1051,7 +1066,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { int offset = 0; try (InputStream dataIn = isNativeIOAvailable ? - NativeIO.getShareDeleteFileInputStream(blockFile) : + new FileInputStream(NativeIO.getShareDeleteFileDescriptor( + blockFile, 0)) : new FileInputStream(blockFile)) { for (int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) { @@ -1078,7 +1094,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } } - static private void truncateBlock(File blockFile, File metaFile, + private void truncateBlock(FsVolumeSpi volume, File blockFile, File metaFile, long oldlen, long newlen) throws IOException { LOG.info("truncateBlock: blockFile=" + blockFile + ", metaFile=" + metaFile @@ -1093,7 +1109,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { + ") to newlen (=" + newlen + ")"); } - DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); + final FileIoProvider fileIoProvider = datanode.getFileIoProvider(); + DataChecksum dcs; + try (FileInputStream fis = fileIoProvider.getFileInputStream( + volume, metaFile)) { + dcs = BlockMetadataHeader.readHeader(fis).getChecksum(); + } + int checksumsize = dcs.getChecksumSize(); int bpc = dcs.getBytesPerChecksum(); long n = (newlen - 1)/bpc + 1; @@ -1102,16 +1124,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { int lastchunksize = (int)(newlen - lastchunkoffset); byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; - RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); - try { + + try (final RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile( + volume, blockFile, "rw")) { //truncate blockFile blockRAF.setLength(newlen); //read last chunk blockRAF.seek(lastchunkoffset); blockRAF.readFully(b, 0, lastchunksize); - } finally { - blockRAF.close(); } //compute checksum @@ -1119,13 +1140,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { dcs.writeValue(b, 0, false); //update metaFile - RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); - try { + try (final RandomAccessFile metaRAF = fileIoProvider.getRandomAccessFile( + volume, metaFile, "rw")) { metaRAF.setLength(newmetalen); metaRAF.seek(newmetalen - checksumsize); metaRAF.write(b, 0, checksumsize); - } finally { - metaRAF.close(); } } @@ -1220,7 +1239,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { LOG.debug("Renaming " + oldmeta + " to " + newmeta); } try { - NativeIO.renameTo(oldmeta, newmeta); + datanode.getFileIoProvider().renameTo( + replicaInfo.getVolume(), oldmeta, newmeta); } catch (IOException e) { throw new IOException("Block " + replicaInfo + " reopen failed. " + " Unable to move meta file " + oldmeta + @@ -1233,10 +1253,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { + ", file length=" + blkfile.length()); } try { - NativeIO.renameTo(blkfile, newBlkFile); + datanode.getFileIoProvider().renameTo( + replicaInfo.getVolume(), blkfile, newBlkFile); } catch (IOException e) { try { - NativeIO.renameTo(newmeta, oldmeta); + datanode.getFileIoProvider().renameTo( + replicaInfo.getVolume(), newmeta, oldmeta); } catch (IOException ex) { LOG.warn("Cannot move meta file " + newmeta + "back to the finalized directory " + oldmeta, ex); @@ -1389,7 +1411,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { LOG.debug("Renaming " + oldmeta + " to " + newmeta); } try { - NativeIO.renameTo(oldmeta, newmeta); + datanode.getFileIoProvider().renameTo( + replicaInfo.getVolume(), oldmeta, newmeta); } catch (IOException e) { replicaInfo.setGenerationStamp(oldGS); // restore old GS throw new IOException("Block " + replicaInfo + " reopen failed. " + @@ -1520,7 +1543,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // any corrupt data written after the acked length can go unnoticed. if (numBytes > bytesAcked) { final File replicafile = rbw.getBlockFile(); - truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); + truncateBlock( + rbw.getVolume(), replicafile, rbw.getMetaFile(), + numBytes, bytesAcked); rbw.setNumBytes(bytesAcked); rbw.setLastChecksumAndDataLen(bytesAcked, null); } @@ -1585,8 +1610,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // move block files to the rbw directory BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId()); - final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), - bpslice.getRbwDir()); + final File dest = moveBlockFiles( + v, b.getLocalBlock(), temp.getBlockFile(), bpslice.getRbwDir()); // create RBW final ReplicaBeingWritten rbw = new ReplicaBeingWritten( blockId, numBytes, expectedGs, @@ -2318,16 +2343,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { return; } - final long diskGS = diskMetaFile != null && diskMetaFile.exists() ? + final FileIoProvider fileIoProvider = datanode.getFileIoProvider(); + final boolean diskMetaFileExists = diskMetaFile != null && + fileIoProvider.exists(vol, diskMetaFile); + final boolean diskFileExists = diskFile != null && + fileIoProvider.exists(vol, diskFile); + + final long diskGS = diskMetaFileExists ? Block.getGenerationStamp(diskMetaFile.getName()) : - HdfsConstants.GRANDFATHER_GENERATION_STAMP; + HdfsConstants.GRANDFATHER_GENERATION_STAMP; - if (diskFile == null || !diskFile.exists()) { + if (!diskFileExists) { if (memBlockInfo == null) { // Block file does not exist and block does not exist in memory // If metadata file exists then delete it - if (diskMetaFile != null && diskMetaFile.exists() - && diskMetaFile.delete()) { + if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) { LOG.warn("Deleted a metadata file without a block " + diskMetaFile.getAbsolutePath()); } @@ -2343,8 +2373,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { LOG.warn("Removed block " + blockId + " from memory with missing block file on the disk"); // Finally remove the metadata file - if (diskMetaFile != null && diskMetaFile.exists() - && diskMetaFile.delete()) { + if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) { LOG.warn("Deleted a metadata file for the deleted block " + diskMetaFile.getAbsolutePath()); } @@ -2374,10 +2403,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { */ // Compare block files File memFile = memBlockInfo.getBlockFile(); - if (memFile.exists()) { + final boolean memFileExists = memFile != null && + fileIoProvider.exists(vol, memFile); + if (memFileExists) { if (memFile.compareTo(diskFile) != 0) { if (diskMetaFile.exists()) { - if (memBlockInfo.getMetaFile().exists()) { + if (fileIoProvider.exists(vol, memBlockInfo.getMetaFile())) { // We have two sets of block+meta files. Decide which one to // keep. ReplicaInfo diskBlockInfo = new FinalizedReplica( @@ -2386,7 +2417,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { memBlockInfo, diskBlockInfo, volumeMap); } } else { - if (!diskFile.delete()) { + if (!fileIoProvider.delete(vol, diskFile)) { LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan"); } } @@ -2410,9 +2441,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Compare generation stamp if (memBlockInfo.getGenerationStamp() != diskGS) { - File memMetaFile = FsDatasetUtil.getMetaFile(diskFile, + File memMetaFile = FsDatasetUtil.getMetaFile(diskFile, memBlockInfo.getGenerationStamp()); - if (memMetaFile.exists()) { + if (fileIoProvider.exists(vol, memMetaFile)) { if (memMetaFile.compareTo(diskMetaFile) != 0) { LOG.warn("Metadata file in memory " + memMetaFile.getAbsolutePath() @@ -2663,7 +2694,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } if (rur.getNumBytes() > newlength) { rur.breakHardLinksIfNeeded(); - truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength); + truncateBlock( + rur.getVolume(), blockFile, metaFile, + rur.getNumBytes(), newlength); if(!copyOnTruncate) { // update RUR with the new length rur.setNumBytes(newlength); http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index 4af8773..40ccfc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; +import java.io.FileDescriptor; import java.io.FilenameFilter; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.Arrays; import com.google.common.base.Preconditions; @@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; +import org.apache.hadoop.io.IOUtils; /** Utility methods. */ @InterfaceAudience.Private @@ -73,6 +76,21 @@ public class FsDatasetUtil { return matches[0]; } + public static FileDescriptor openAndSeek(File file, long offset) + throws IOException { + RandomAccessFile raf = null; + try { + raf = new RandomAccessFile(file, "r"); + if (offset > 0) { + raf.seek(offset); + } + return raf.getFD(); + } catch(IOException ioe) { + IOUtils.cleanup(null, raf); + throw ioe; + } + } + /** * Find the meta-file for the specified block file * and then return the generation stamp from the name of the meta-file. http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index ecd72ff..9955bf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -19,13 +19,12 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.BufferedWriter; import java.io.File; -import java.io.FileOutputStream; +import java.io.FileInputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.RandomAccessFile; import java.nio.channels.ClosedChannelException; -import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.Collections; @@ -44,8 +43,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -60,7 +59,6 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.StringUtils; @@ -111,6 +109,7 @@ public class FsVolumeImpl implements FsVolumeSpi { // limit the visible capacity for tests. If negative, then we just // query from the filesystem. protected volatile long configuredCapacity; + private final FileIoProvider fileIoProvider; /** * Per-volume worker pool that processes new blocks to cache. @@ -134,6 +133,9 @@ public class FsVolumeImpl implements FsVolumeSpi { this.usage = new DF(parent, conf); this.storageType = storageType; this.configuredCapacity = -1; + // dataset.datanode may be null in some tests. + this.fileIoProvider = dataset.datanode != null ? + dataset.datanode.getFileIoProvider() : new FileIoProvider(conf); cacheExecutor = initializeCacheExecutor(parent); } @@ -622,8 +624,8 @@ public class FsVolumeImpl implements FsVolumeSpi { */ private String getNextSubDir(String prev, File dir) throws IOException { - List<String> children = - IOUtils.listDirectory(dir, SubdirFilter.INSTANCE); + List<String> children = fileIoProvider.listDirectory( + FsVolumeImpl.this, dir, SubdirFilter.INSTANCE); cache = null; cacheMs = 0; if (children.size() == 0) { @@ -676,8 +678,8 @@ public class FsVolumeImpl implements FsVolumeSpi { } File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized", state.curFinalizedDir, state.curFinalizedSubDir).toFile(); - List<String> entries = - IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE); + List<String> entries = fileIoProvider.listDirectory( + FsVolumeImpl.this, dir, BlockFileFilter.INSTANCE); if (entries.size() == 0) { entries = null; } else { @@ -784,19 +786,18 @@ public class FsVolumeImpl implements FsVolumeSpi { public void save() throws IOException { state.lastSavedMs = Time.now(); boolean success = false; - try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter( - new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) { + try (BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(fileIoProvider.getFileOutputStream( + FsVolumeImpl.this, getTempSaveFile()), "UTF-8"))) { WRITER.writeValue(writer, state); success = true; } finally { if (!success) { - if (getTempSaveFile().delete()) { - LOG.debug("save({}, {}): error deleting temporary file.", - storageID, bpid); - } + fileIoProvider.delete(FsVolumeImpl.this, getTempSaveFile()); } } - Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(), + fileIoProvider.move(FsVolumeImpl.this, + getTempSaveFile().toPath(), getSaveFile().toPath(), StandardCopyOption.ATOMIC_MOVE); if (LOG.isTraceEnabled()) { LOG.trace("save({}, {}): saved {}", storageID, bpid, @@ -982,11 +983,12 @@ public class FsVolumeImpl implements FsVolumeSpi { File finalizedDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_FINALIZED); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); - if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive( - finalizedDir)) { + if (fileIoProvider.exists(this, finalizedDir) && + !DatanodeUtil.dirNoFilesRecursive(this, finalizedDir, fileIoProvider)) { return false; } - if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) { + if (fileIoProvider.exists(this, rbwDir) && + fileIoProvider.list(this, rbwDir).length != 0) { return false; } return true; @@ -1007,35 +1009,38 @@ public class FsVolumeImpl implements FsVolumeSpi { DataStorage.STORAGE_DIR_LAZY_PERSIST); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); if (force) { - DataStorage.fullyDelete(bpDir); + fileIoProvider.fullyDelete(this, bpDir); } else { - if (!rbwDir.delete()) { + if (!fileIoProvider.delete(this, rbwDir)) { throw new IOException("Failed to delete " + rbwDir); } - if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) || - !FileUtil.fullyDelete(finalizedDir)) { + if (!DatanodeUtil.dirNoFilesRecursive( + this, finalizedDir, fileIoProvider) || + !fileIoProvider.fullyDelete( + this, finalizedDir)) { throw new IOException("Failed to delete " + finalizedDir); } if (lazypersistDir.exists() && - ((!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) || - !FileUtil.fullyDelete(lazypersistDir)))) { + ((!DatanodeUtil.dirNoFilesRecursive( + this, lazypersistDir, fileIoProvider) || + !fileIoProvider.fullyDelete(this, lazypersistDir)))) { throw new IOException("Failed to delete " + lazypersistDir); } - DataStorage.fullyDelete(tmpDir); - for (File f : FileUtil.listFiles(bpCurrentDir)) { - if (!f.delete()) { + fileIoProvider.fullyDelete(this, tmpDir); + for (File f : fileIoProvider.listFiles(this, bpCurrentDir)) { + if (!fileIoProvider.delete(this, f)) { throw new IOException("Failed to delete " + f); } } - if (!bpCurrentDir.delete()) { + if (!fileIoProvider.delete(this, bpCurrentDir)) { throw new IOException("Failed to delete " + bpCurrentDir); } - for (File f : FileUtil.listFiles(bpDir)) { - if (!f.delete()) { + for (File f : fileIoProvider.listFiles(this, bpDir)) { + if (!fileIoProvider.delete(this, f)) { throw new IOException("Failed to delete " + f); } } - if (!bpDir.delete()) { + if (!fileIoProvider.delete(this, bpDir)) { throw new IOException("Failed to delete " + bpDir); } } @@ -1059,9 +1064,13 @@ public class FsVolumeImpl implements FsVolumeSpi { @Override public byte[] loadLastPartialChunkChecksum( File blockFile, File metaFile) throws IOException { - // readHeader closes the temporary FileInputStream. - DataChecksum dcs = BlockMetadataHeader - .readHeader(metaFile).getChecksum(); + DataChecksum dcs; + + try (FileInputStream fis = fileIoProvider.getFileInputStream( + this, metaFile)) { + dcs = BlockMetadataHeader.readHeader(fis).getChecksum(); + } + final int checksumSize = dcs.getChecksumSize(); final long onDiskLen = blockFile.length(); final int bytesPerChecksum = dcs.getBytesPerChecksum(); @@ -1075,7 +1084,8 @@ public class FsVolumeImpl implements FsVolumeSpi { long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + (onDiskLen / bytesPerChecksum) * checksumSize; byte[] lastChecksum = new byte[checksumSize]; - try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) { + try (RandomAccessFile raf = fileIoProvider.getRandomAccessFile( + this, metaFile, "r")) { raf.seek(offsetInChecksum); int readBytes = raf.read(lastChecksum, 0, checksumSize); if (readBytes == -1) { @@ -1090,4 +1100,9 @@ public class FsVolumeImpl implements FsVolumeSpi { } return lastChecksum; } + + @Override + public FileIoProvider getFileIoProvider() { + return fileIoProvider; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index 0bb95d2..fbf09fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -702,7 +702,7 @@ public class TestFileAppend{ ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaHandler.getReplica(); ReplicaOutputStreams - outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300); + outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM); OutputStream dataOutput = outputStreams.getDataOut(); byte[] appendBytes = new byte[1]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 80481c2..c10d84b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -116,7 +116,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { DatanodeStorage.State.NORMAL; private final AutoCloseableLock datasetLock; - + private final FileIoProvider fileIoProvider; + static final byte[] nullCrcFileData; static { DataChecksum checksum = DataChecksum.newDataChecksum( @@ -254,8 +255,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } @Override - synchronized public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) + synchronized public ReplicaOutputStreams createStreams(boolean isCreate, + DataChecksum requestedChecksum) throws IOException { if (finalized) { throw new IOException("Trying to write to a finalized replica " @@ -263,7 +264,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, - volume.isTransientStorage(), slowLogThresholdMs); + volume, fileIoProvider); } } @@ -444,12 +445,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { map.get(bpid).getUsed(), 0L); } } - + static class SimulatedVolume implements FsVolumeSpi { private final SimulatedStorage storage; + private final FileIoProvider fileIoProvider; - SimulatedVolume(final SimulatedStorage storage) { + SimulatedVolume(final SimulatedStorage storage, + final FileIoProvider fileIoProvider) { this.storage = storage; + this.fileIoProvider = fileIoProvider; } @Override @@ -532,6 +536,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } @Override + public FileIoProvider getFileIoProvider() { + return fileIoProvider; + } + + @Override public VolumeCheckResult check(VolumeCheckContext context) throws Exception { return VolumeCheckResult.HEALTHY; @@ -562,10 +571,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } registerMBean(datanodeUuid); + this.fileIoProvider = new FileIoProvider(conf); this.storage = new SimulatedStorage( conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); - this.volume = new SimulatedVolume(this.storage); + this.volume = new SimulatedVolume(this.storage, this.fileIoProvider); this.datasetLock = new AutoCloseableLock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 6977c57..346250b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -642,7 +642,7 @@ public class TestBlockRecovery { ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); streams.getChecksumOut().write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index a2b6398..cda6066 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -887,6 +887,11 @@ public class TestDirectoryScanner { } @Override + public FileIoProvider getFileIoProvider() { + return null; + } + + @Override public VolumeCheckResult check(VolumeCheckContext context) throws Exception { return VolumeCheckResult.HEALTHY; http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 84e9180..dd7d239 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -83,7 +83,7 @@ public class TestSimulatedFSDataset { ReplicaInPipelineInterface bInfo = fsdataset.createRbw( StorageType.DEFAULT, b, false).getReplica(); ReplicaOutputStreams out = bInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); try { OutputStream dataOut = out.getDataOut(); assertEquals(0, fsdataset.getLength(b)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index ab572d0..7548e83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -133,7 +133,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> { @Override public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, long ckoff) throws IOException { - return new ReplicaInputStreams(null, null, null); + return new ReplicaInputStreams(null, null, null, null); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java index 3e2fd7d..69ab207 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java @@ -57,10 +57,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface { @Override public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) + DataChecksum requestedChecksum) throws IOException { - return new ReplicaOutputStreams(null, null, requestedChecksum, false, - slowLogThresholdMs); + return new ReplicaOutputStreams(null, null, requestedChecksum, + null, null); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index 5c62b5b..ff18e84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; @@ -110,6 +111,11 @@ public class ExternalVolumeImpl implements FsVolumeSpi { } @Override + public FileIoProvider getFileIoProvider() { + return null; + } + + @Override public VolumeCheckResult check(VolumeCheckContext context) throws Exception { return VolumeCheckResult.HEALTHY; http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 1601259..7484a40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; @@ -164,6 +165,8 @@ public class TestFsDatasetImpl { this.conf = new Configuration(); this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); + final FileIoProvider fileIoProvider = new FileIoProvider(conf); + when(datanode.getFileIoProvider()).thenReturn(fileIoProvider); when(datanode.getConf()).thenReturn(conf); final DNConf dnConf = new DNConf(datanode); when(datanode.getDnConf()).thenReturn(dnConf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dae26/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index 92019f1..7749c5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -103,6 +103,8 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase { .add(DFSConfigKeys.DFS_DATANODE_STARTUP_KEY); configurationPropsToSkipCompare .add(DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY); + configurationPropsToSkipCompare + .add(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY); // Allocate xmlPropsToSkipCompare = new HashSet<String>(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org