This is an automated email from the ASF dual-hosted git repository. epayne pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 49d5463 HADOOP-15281. Distcp to add no-rename copy option. Contributed by Andrew Olson. 49d5463 is described below commit 49d54633e0f4bd388c00d591e90666dbb7633c9f Author: Eric E Payne <er...@verizonmedia.com> AuthorDate: Thu Feb 7 23:15:18 2019 +0000 HADOOP-15281. Distcp to add no-rename copy option. Contributed by Andrew Olson. --- .../fs/contract/s3a/ITestS3AContractDistCp.java | 33 +++++++++++ .../org/apache/hadoop/tools/DistCpConstants.java | 3 +- .../org/apache/hadoop/tools/DistCpContext.java | 4 ++ .../apache/hadoop/tools/DistCpOptionSwitch.java | 14 ++++- .../org/apache/hadoop/tools/DistCpOptions.java | 19 ++++++ .../org/apache/hadoop/tools/OptionsParser.java | 4 +- .../org/apache/hadoop/tools/mapred/CopyMapper.java | 6 +- .../tools/mapred/RetriableFileCopyCommand.java | 53 ++++++++++++----- .../hadoop-distcp/src/site/markdown/DistCp.md.vm | 6 +- .../org/apache/hadoop/tools/TestDistCpOptions.java | 5 +- .../tools/contract/AbstractContractDistCpTest.java | 68 +++++++++++++++++++++- 11 files changed, 192 insertions(+), 23 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java index b3d511e..740f256 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.FileNotFoundException; import java.io.IOException; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -26,6 +27,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.s3a.FailureInjectionPolicy; import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; @@ -74,4 +76,35 @@ public class ITestS3AContractDistCp extends AbstractContractDistCpTest { Path path = super.path(filepath); return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING); } + + @Override + public void testDirectWrite() throws Exception { + resetStorageStatistics(); + super.testDirectWrite(); + assertEquals("Expected no renames for a direct write distcp", 0L, + getRenameOperationCount()); + } + + @Override + public void testNonDirectWrite() throws Exception { + resetStorageStatistics(); + try { + super.testNonDirectWrite(); + } catch (FileNotFoundException e) { + // We may get this exception when data is written to a DELAY_LISTING_ME + // directory causing verification of the distcp success to fail if + // S3Guard is not enabled + } + assertEquals("Expected 2 renames for a non-direct write distcp", 2L, + getRenameOperationCount()); + } + + private void resetStorageStatistics() { + getFileSystem().getStorageStatistics().reset(); + } + + private long getRenameOperationCount() { + return getFileSystem().getStorageStatistics() + .getLong(StorageStatistics.CommonStatisticNames.OP_RENAME); + } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 4946091..e20f206 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -85,7 +85,8 @@ public final class DistCpConstants { "distcp.dynamic.min.records_per_chunk"; public static final String CONF_LABEL_SPLIT_RATIO = "distcp.dynamic.split.ratio"; - + public static final String CONF_LABEL_DIRECT_WRITE = "distcp.direct.write"; + /* Total bytes to be copied. Updated by copylisting. Unfiltered count */ public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java index fc047ca..1e63d80 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java @@ -179,6 +179,10 @@ public class DistCpContext { return options.getCopyBufferSize(); } + public boolean shouldDirectWrite() { + return options.shouldDirectWrite(); + } + public void setTargetPathExists(boolean targetPathExists) { this.targetPathExists = targetPathExists; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index e57e413..49ffc59 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -223,7 +223,19 @@ public enum DistCpOptionSwitch { */ FILTERS(DistCpConstants.CONF_LABEL_FILTERS_FILE, new Option("filters", true, "The path to a file containing a list of" - + " strings for paths to be excluded from the copy.")); + + " strings for paths to be excluded from the copy.")), + + /** + * Write directly to the final location, avoiding the creation and rename + * of temporary files. + * This is typically useful in cases where the target filesystem + * implementation does not support atomic rename operations, such as with + * the S3AFileSystem which translates file renames to potentially very + * expensive copy-then-delete operations. + */ + DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE, + new Option("direct", false, "Write files directly to the" + + " target location, avoiding temporary file rename.")); public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index cff04eb..f5a72bf 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -155,6 +155,9 @@ public final class DistCpOptions { private final int copyBufferSize; + /** Whether data should be written directly to the target paths. */ + private final boolean directWrite; + /** * File attributes for preserve. * @@ -216,6 +219,8 @@ public final class DistCpOptions { this.copyBufferSize = builder.copyBufferSize; this.verboseLog = builder.verboseLog; this.trackPath = builder.trackPath; + + this.directWrite = builder.directWrite; } public Path getSourceFileListing() { @@ -343,6 +348,10 @@ public final class DistCpOptions { return trackPath; } + public boolean shouldDirectWrite() { + return directWrite; + } + /** * Add options to configuration. These will be used in the Mapper/committer * @@ -391,6 +400,8 @@ public final class DistCpOptions { DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.NUM_LISTSTATUS_THREADS, Integer.toString(numListstatusThreads)); } + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE, + String.valueOf(directWrite)); } /** @@ -427,6 +438,7 @@ public final class DistCpOptions { ", blocksPerChunk=" + blocksPerChunk + ", copyBufferSize=" + copyBufferSize + ", verboseLog=" + verboseLog + + ", directWrite=" + directWrite + '}'; } @@ -476,6 +488,8 @@ public final class DistCpOptions { private int copyBufferSize = DistCpConstants.COPY_BUFFER_SIZE_DEFAULT; + private boolean directWrite = false; + public Builder(List<Path> sourcePaths, Path targetPath) { Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(), "Source paths should not be null or empty!"); @@ -728,6 +742,11 @@ public final class DistCpOptions { this.verboseLog = newVerboseLog; return this; } + + public Builder withDirectWrite(boolean newDirectWrite) { + this.directWrite = newDirectWrite; + return this; + } } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 668b594..ef0017b 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -113,7 +113,9 @@ public class OptionsParser { .withBlocking( !command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) .withVerboseLog( - command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch())); + command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch())) + .withDirectWrite( + command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch())); if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { String[] snapshots = getVals(command, diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index faa4aa2..6b0f1f1 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -84,6 +84,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> private boolean overWrite = false; private boolean append = false; private boolean verboseLog = false; + private boolean directWrite = false; private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class); private FileSystem targetFS = null; @@ -111,6 +112,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> DistCpOptionSwitch.VERBOSE_LOG.getConfigLabel(), false); preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch. PRESERVE_STATUS.getConfigLabel())); + directWrite = conf.getBoolean( + DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false); targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); Path targetFinalPath = new Path(conf.get( @@ -253,7 +256,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> long bytesCopied; try { bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description, - action).execute(sourceFileStatus, target, context, fileAttributes); + action, directWrite).execute(sourceFileStatus, target, context, + fileAttributes); } catch (Exception e) { context.setStatus("Copy Failure: " + sourceFileStatus.getPath()); throw new IOException("File copy failed: " + sourceFileStatus.getPath() + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 55f90d0..25c1de1 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -55,6 +55,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class); private boolean skipCrc = false; + private boolean directWrite = false; private FileAction action; /** @@ -80,6 +81,21 @@ public class RetriableFileCopyCommand extends RetriableCommand { } /** + * Create a RetriableFileCopyCommand. + * + * @param skipCrc Whether to skip the crc check. + * @param description A verbose description of the copy operation. + * @param action We should overwrite the target file or append new data to it. + * @param directWrite Whether to write directly to the target path, avoiding a + * temporary file rename. + */ + public RetriableFileCopyCommand(boolean skipCrc, String description, + FileAction action, boolean directWrite) { + this(skipCrc, description, action); + this.directWrite = directWrite; + } + + /** * Implementation of RetriableCommand::doExecute(). * This is the actual copy-implementation. * @param arguments Argument-list to the command. @@ -102,16 +118,20 @@ public class RetriableFileCopyCommand extends RetriableCommand { private long doCopy(CopyListingFileStatus source, Path target, Mapper.Context context, EnumSet<FileAttribute> fileAttributes) throws IOException { + LOG.info("Copying " + source.getPath() + " to " + target); + final boolean toAppend = action == FileAction.APPEND; - Path targetPath = toAppend ? target : getTmpFile(target, context); + final boolean useTempTarget = !toAppend && !directWrite; + Path targetPath = useTempTarget ? getTempFile(target, context) : target; + + LOG.info("Writing to " + + (useTempTarget ? "temporary" : "direct") + + " target file path " + targetPath); + final Configuration configuration = context.getConfiguration(); FileSystem targetFS = target.getFileSystem(configuration); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Copying " + source.getPath() + " to " + target); - LOG.debug("Target file path: " + targetPath); - } final Path sourcePath = source.getPath(); final FileSystem sourceFS = sourcePath.getFileSystem(configuration); final FileChecksum sourceChecksum = fileAttributes @@ -134,17 +154,20 @@ public class RetriableFileCopyCommand extends RetriableCommand { targetFS, targetPath); } } - // it's not append case, thus we first write to a temporary file, rename - // it to the target path. - if (!toAppend) { + // it's not append or direct write (preferred for s3a) case, thus we first + // write to a temporary file, then rename it to the target path. + if (useTempTarget) { + LOG.info("Renaming temporary target file path " + targetPath + + " to " + target); promoteTmpToTarget(targetPath, target, targetFS); } + LOG.info("Completed writing " + target + " (" + bytesRead + " bytes)"); return bytesRead; } finally { // note that for append case, it is possible that we append partial data // and then fail. In that case, for the next retry, we either reuse the // partial appended data if it is good or we overwrite the whole file - if (!toAppend) { + if (useTempTarget) { targetFS.delete(targetPath, false); } } @@ -252,14 +275,16 @@ public class RetriableFileCopyCommand extends RetriableCommand { } } - private Path getTmpFile(Path target, Mapper.Context context) { + private Path getTempFile(Path target, Mapper.Context context) { Path targetWorkPath = new Path(context.getConfiguration(). get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); - Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath; - LOG.info("Creating temp file: " + - new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString())); - return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()); + Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent() + : targetWorkPath; + Path tempFile = new Path(root, ".distcp.tmp." + + context.getTaskAttemptID().toString()); + LOG.info("Creating temp file: " + tempFile); + return tempFile; } @VisibleForTesting diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index b855422..25ea7e2 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -241,6 +241,7 @@ Flag | Description | Notes `-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements [...] `-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B | `-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. +`-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store Architecture of DistCp ---------------------- @@ -455,7 +456,7 @@ configuration, or be otherwise available in all cluster hosts. DistCp can be used to upload data ```bash -hadoop distcp hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1 +hadoop distcp -direct hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1 ``` To download data @@ -535,6 +536,9 @@ rely on disk buffering. Copies each byte down to the Hadoop worker nodes and back to the bucket. As well as being slow, it means that charges may be incurred. +* The `-direct` option can be used to write to object store target paths directly, +avoiding the potentially very expensive temporary file rename operations that would +otherwise occur. Frequently Asked Questions -------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java index 62a2e6d..7382795 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java @@ -287,8 +287,9 @@ public class TestDistCpOptions { "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " + "mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " + "atomicWorkPath=null, logPath=null, sourceFileListing=abc, " + - "sourcePaths=null, targetPath=xyz, filtersFile='null'," + - " blocksPerChunk=0, copyBufferSize=8192, verboseLog=false}"; + "sourcePaths=null, targetPath=xyz, filtersFile='null', " + + "blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " + + "directWrite=false}"; String optionString = option.toString(); Assert.assertEquals(val, optionString); Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index 1458991..c8a1d7e 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -552,7 +552,7 @@ public abstract class AbstractContractDistCpTest /** * Run the distcp job. - * @param optons distcp options + * @param options distcp options * @return the job. It will have already completed. * @throws Exception failure */ @@ -586,4 +586,68 @@ public abstract class AbstractContractDistCpTest private static void mkdirs(FileSystem fs, Path dir) throws Exception { assertTrue("Failed to mkdir " + dir, fs.mkdirs(dir)); } -} + + @Test + public void testDirectWrite() throws Exception { + describe("copy file from local to remote using direct write option"); + directWrite(localFS, localDir, remoteFS, remoteDir, true); + } + + @Test + public void testNonDirectWrite() throws Exception { + describe("copy file from local to remote without using direct write " + + "option"); + directWrite(localFS, localDir, remoteFS, remoteDir, false); + } + + /** + * Executes a test with support for using direct write option. + * + * @param srcFS source FileSystem + * @param srcDir source directory + * @param dstFS destination FileSystem + * @param dstDir destination directory + * @param directWrite whether to use -directwrite option + * @throws Exception if there is a failure + */ + private void directWrite(FileSystem srcFS, Path srcDir, FileSystem dstFS, + Path dstDir, boolean directWrite) throws Exception { + initPathFields(srcDir, dstDir); + + // Create 2 test files + mkdirs(srcFS, inputSubDir1); + byte[] data1 = dataset(64, 33, 43); + createFile(srcFS, inputFile1, true, data1); + byte[] data2 = dataset(200, 43, 53); + createFile(srcFS, inputFile2, true, data2); + Path target = new Path(dstDir, "outputDir"); + if (directWrite) { + runDistCpDirectWrite(inputDir, target); + } else { + runDistCp(inputDir, target); + } + ContractTestUtils.assertIsDirectory(dstFS, target); + lsR("Destination tree after distcp", dstFS, target); + + // Verify copied file contents + verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1); + verifyFileContents(dstFS, new Path(target, "inputDir/subDir1/file2"), + data2); + } + + /** + * Run distcp -direct srcDir destDir. + * @param srcDir local source directory + * @param destDir remote destination directory + * @return the completed job + * @throws Exception any failure. + */ + private Job runDistCpDirectWrite(final Path srcDir, final Path destDir) + throws Exception { + describe("\nDistcp -direct from " + srcDir + " to " + destDir); + return runDistCp(buildWithStandardOptions( + new DistCpOptions.Builder( + Collections.singletonList(srcDir), destDir) + .withDirectWrite(true))); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org