[ https://issues.apache.org/jira/browse/MAPREDUCE-7341?focusedWorklogId=742137&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-742137 ]
ASF GitHub Bot logged work on MAPREDUCE-7341: --------------------------------------------- Author: ASF GitHub Bot Created on: 16/Mar/22 08:58 Start Date: 16/Mar/22 08:58 Worklog Time Spent: 10m Work Description: mukund-thakur commented on a change in pull request #2971: URL: https://github.com/apache/hadoop/pull/2971#discussion_r827666030 ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/LoadManifestsStage.java ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.util.functional.TaskPool; + +import static org.apache.commons.io.FileUtils.byteCountToDisplaySize; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_MANIFEST_FILE_SIZE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_ALL_MANIFESTS; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_LOAD_MANIFESTS; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.maybeAddIOStatistics; + +/** + * Stage to load all the task manifests in the job attempt directory. + * Invoked in Job Commit. + * Manifests are loaded in parallel. + * The IOStatistics snapshot passed in is built up with the statistics + * and the statistics stripped from the manifest if prune == true. + * This keeps the memory footprint of each manifest down. + */ +public class LoadManifestsStage extends + AbstractJobCommitStage<Boolean, LoadManifestsStage.Result> { + + private static final Logger LOG = LoggerFactory.getLogger( + LoadManifestsStage.class); + + /** + * Summary of manifest loading. + */ + private final SummaryInfo summaryInfo = new SummaryInfo(); + + /** + * Should manifests be pruned of IOStatistics? + */ + private boolean pruneManifests; + + /** + * List of loaded manifests. + */ + private final List<TaskManifest> manifests = new ArrayList<>(); + + public LoadManifestsStage(final StageConfig stageConfig) { + super(false, stageConfig, OP_STAGE_JOB_LOAD_MANIFESTS, true); + } + + /** + * Load the manifests. + * @param prune should manifests be pruned of IOStatistics? + * @return the summary and a list of manifests. + * @throws IOException IO failure. + */ + @Override + protected LoadManifestsStage.Result executeStage( + final Boolean prune) throws IOException { + + final Path manifestDir = getTaskManifestDir(); + LOG.info("{}: Executing Manifest Job Commit with manifests in {}", + getName(), + manifestDir); + pruneManifests = prune; + // build a list of all task manifests successfully committed + // + msync(manifestDir); + final RemoteIterator<FileStatus> manifestFiles = listManifests(); + + final List<TaskManifest> manifestList = loadAllManifests(manifestFiles); + LOG.info("{}: Summary of {} manifests loaded in {}: {}", + getName(), + manifestList.size(), + manifestDir, + summaryInfo); + + // collect any stats + maybeAddIOStatistics(getIOStatistics(), manifestFiles); + return new LoadManifestsStage.Result(summaryInfo, manifestList); + } + + /** + * Load all the manifests. + * @param manifestFiles list of manifest files. + * @return the loaded manifests. + * @throws IOException IO Failure. + */ + private List<TaskManifest> loadAllManifests( + final RemoteIterator<FileStatus> manifestFiles) throws IOException { + + trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () -> + TaskPool.foreach(manifestFiles) + .executeWith(getIOProcessors()) + .stopOnFailure() + .run(this::processOneManifest)); + return manifests; + } + + /** + * Method invoked to process one manifest. + * @param status file to process. + * @throws IOException failure to load/parse + */ + private void processOneManifest(FileStatus status) + throws IOException { + updateAuditContext(OP_LOAD_ALL_MANIFESTS); + + TaskManifest m = fetchTaskManifest(status); + progress(); + + // update the manifest list in a synchronized block. + + synchronized (manifests) { + manifests.add(m); Review comment: One thing to notice here is manifests can grow in size and cause OOM. ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/AuditingIntegration.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.audit.CommonAuditContext; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConfig; + +import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_JOB_ID; +import static org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CONTEXT_ATTR_STAGE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CONTEXT_ATTR_TASK_ATTEMPT_ID; + +/** + * Helper class to support integration with Hadoop 3.3.2+ Auditing. + * This MUST BE the sole place where fs.audit methods are used, so can be replaced + * by a stub class on any backport. + */ +@InterfaceAudience.Private +public final class AuditingIntegration { + private AuditingIntegration() { + } + + /** + * Add jobID to current context; also + * task attempt ID if set. + */ + public static void updateCommonContextOnCommitterEntry( + ManifestCommitterConfig committerConfig) { + CommonAuditContext context = currentAuditContext(); + context.put(PARAM_JOB_ID, + committerConfig.getJobUniqueId()); + // maybe the task attempt ID. + if (!committerConfig.getTaskAttemptId().isEmpty()) { + context.put(CONTEXT_ATTR_TASK_ATTEMPT_ID, + committerConfig.getTaskAttemptId()); + } + } + + /** + * Callback on stage entry. + * Sets the sactiveStage and updates the Review comment: nit : typo sactiveStage ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/TaskAttemptScanDirectoryStage.java ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.LongSummaryStatistics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.util.DurationInfo; + +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_DIRECTORY_COUNT_MEAN; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_DIRECTORY_DEPTH_MEAN; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_FILE_COUNT_MEAN; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_FILE_SIZE_MEAN; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_SCAN_DIRECTORY; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry.dirEntry; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createTaskManifest; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.maybeAddIOStatistics; + +/** + * Stage to scan a directory tree and build a task manifest. + * This is executed by the task committer. + */ +public final class TaskAttemptScanDirectoryStage + extends AbstractJobCommitStage<Void, TaskManifest> { + + private static final Logger LOG = LoggerFactory.getLogger( + TaskAttemptScanDirectoryStage.class); + + public TaskAttemptScanDirectoryStage( + final StageConfig stageConfig) { + super(true, stageConfig, OP_STAGE_TASK_SCAN_DIRECTORY, false); + } + + /** + * Build the Manifest. + * @return the manifest + * @throws IOException failure. + */ + @Override + protected TaskManifest executeStage(final Void arguments) + throws IOException { + + final Path taskAttemptDir = getRequiredTaskAttemptDir(); + final TaskManifest manifest = createTaskManifest(getStageConfig()); + + LOG.info("{}: scanning directory {}", + getName(), taskAttemptDir); + + final int depth = scanDirectoryTree(manifest, taskAttemptDir, + getDestinationDir(), + 0); + List<FileOrDirEntry> filesToCommit = manifest.getFilesToCommit(); + LongSummaryStatistics fileSummary = filesToCommit.stream() + .mapToLong(FileOrDirEntry::getSize) + .summaryStatistics(); + long fileDataSize = fileSummary.getSum(); + long fileCount = fileSummary.getCount(); + int dirCount = manifest.getDirectoriesToCreate().size(); + LOG.info("{}: directory {} contained {} file(s); data size {}", + getName(), + taskAttemptDir, + fileCount, + fileDataSize); + LOG.info("{}: Directory count = {}; maximum depth {}", + getName(), + dirCount, + depth); + // add statistics about the task output which, when aggregated, provides + // insight into structure of job, task skew, etc. + IOStatisticsStore iostats = getIOStatistics(); + iostats.addSample(COMMITTER_TASK_DIRECTORY_COUNT_MEAN, dirCount); + iostats.addSample(COMMITTER_TASK_DIRECTORY_DEPTH_MEAN, depth); + iostats.addSample(COMMITTER_TASK_FILE_COUNT_MEAN, fileCount); + iostats.addSample(COMMITTER_TASK_FILE_SIZE_MEAN, fileDataSize); + + return manifest; + } + + /** + * Recursively scan a directory tree. + * The manifest will contain all files to rename + * (source and dest) and directories to create. + * All files are processed before any of the subdirs are. + * This helps in statistics gathering. + * There's some optimizations which could be done with async + * fetching of the iterators of those subdirs, but as this + * is generally off-critical path then that "enhancement" + * can be postponed until data suggests this needs improvement. + * @param manifest manifest to update + * @param srcDir dir to scan + * @param destDir destination directory + * @param depth depth from the task attempt dir. + * @return the maximum depth of child directories + * @throws IOException IO failure. + */ + private int scanDirectoryTree( + TaskManifest manifest, + Path srcDir, + Path destDir, + int depth) throws IOException { + + // generate some task progress in case directory scanning is very slow. + progress(); + + int maxDepth = 0; + int files = 0; + List<FileStatus> subdirs = new ArrayList<>(); + try (DurationInfo ignored = new DurationInfo(LOG, false, + "Task Attempt %s source dir %s, dest dir %s", + getTaskAttemptId(), srcDir, destDir)) { + + // list the directory. This may block until the listing is complete, + // or, if the FS does incremental or asynchronous fetching, until the + // first page of results is ready. + final RemoteIterator<FileStatus> listing = listStatusIterator(srcDir); Review comment: This is async paged list request and non recursive. ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/LoadManifestsStage.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.util.functional.TaskPool; + +import static org.apache.commons.io.FileUtils.byteCountToDisplaySize; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_ALL_MANIFESTS; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_LOAD_MANIFESTS; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.maybeAddIOStatistics; + +/** + * Stage to load all the task manifests in the job attempt directory. + * Invoked in Job Commit. + * Manifests are loaded in parallel. + * The IOStatistics snapshot passed in is built up with the statistics + * and the statistics stripped from the manifest if prune == true. + * This keeps the memory footprint of each manifest down. + */ +public class LoadManifestsStage extends + AbstractJobCommitStage<Boolean, LoadManifestsStage.Result> { + + private static final Logger LOG = LoggerFactory.getLogger( + LoadManifestsStage.class); + + /** + * Summary of manifest loading. + */ + private final SummaryInfo summaryInfo = new SummaryInfo(); + + /** + * Should manifests be pruned of IOStatistics? + */ + private boolean pruneManifests; + + /** + * List of loaded manifests. + */ + private final List<TaskManifest> manifests = new ArrayList<>(); + + public LoadManifestsStage(final StageConfig stageConfig) { + super(false, stageConfig, OP_STAGE_JOB_LOAD_MANIFESTS, true); + } + + /** + * Load the manifests. + * @param prune should manifests be pruned of IOStatistics? + * @return the summary and a list of manifests. + * @throws IOException IO failure. + */ + @Override + protected LoadManifestsStage.Result executeStage( + final Boolean prune) throws IOException { + + final Path manifestDir = getTaskManifestDir(); + LOG.info("{}: Executing Manifest Job Commit with manifests in {}", + getName(), + manifestDir); + pruneManifests = prune; + // build a list of all task manifests successfully committed + // + msync(manifestDir); + final RemoteIterator<FileStatus> manifestFiles = listManifests(); + + final List<TaskManifest> manifestList = loadAllManifests(manifestFiles); + LOG.info("{}: Summary of {} manifests loaded in {}: {}", + getName(), + manifestList.size(), + manifestDir, + summaryInfo); + + // collect any stats + maybeAddIOStatistics(getIOStatistics(), manifestFiles); + return new LoadManifestsStage.Result(summaryInfo, manifestList); + } + + /** + * Load all the manifests. + * @param manifestFiles list of manifest files. + * @return the loaded manifests. + * @throws IOException IO Failure. + */ + private List<TaskManifest> loadAllManifests( + final RemoteIterator<FileStatus> manifestFiles) throws IOException { + + trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () -> + TaskPool.foreach(manifestFiles) + .executeWith(getIOProcessors()) + .stopOnFailure() Review comment: I think any exception in any thred will stop the ececution and will be thrown be from the method and finally a stage failure is reported. ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobCommitStage.java ########## @@ -0,0 +1,1009 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations; +import org.apache.hadoop.util.OperationDuration; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.functional.CallableRaisingIOE; +import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.hadoop.util.functional.TaskPool; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_RENAME; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.IO_ACQUIRE_READ_PERMIT_BLOCKED; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.IO_ACQUIRE_WRITE_PERMIT_BLOCKED; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_GET_FILE_STATUS; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_LIST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_OPEN_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_COMMIT_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_CREATE_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_DELETE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_MKDIR; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_RENAME; + +/** + * A Stage in Task/Job Commit. + * A stage can be executed once only, creating the return value of the + * {@link #apply(Object)} method, and, potentially, updating the state of the + * store via {@link StoreOperations}. + * IOStatistics will also be updated. + * Stages are expected to be combined to form the commit protocol. + * @param <IN> Type of arguments to the stage. + * @param <OUT> Type of result. + */ +public abstract class AbstractJobCommitStage<IN, OUT> Review comment: I agree with name change as there are methods here which is getting called from multiple stages. ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java ########## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.EntryStatus; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.util.functional.TaskPool; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.measureDurationOfInvocation; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_CREATE_DIRECTORIES; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_FILE_UNDER_DESTINATION; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MKDIRS_RETURNED_FALSE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_PREPARE_DIR_ANCESTORS; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CREATE_TARGET_DIRS; +import static org.apache.hadoop.util.OperationDuration.humanTime; + +/** + * Prepare the destination directory tree, as efficiently as possible. + * possible -and doing those IO operations in the thread pool. + * + * The classic FileOutputCommitter does a recursive treewalk and + * deletes any files found at paths where directories are to be created. + * + * Each task manifest's directories are combined with those of the other tasks + * to build a set of all directories which are needed, without duplicates. + * + * This stage requires the aggregate set of manifests to contain + * all directories to create, including level, + * and expects them to have been probed for existence/state. + * + * For each level, all dirs are processed in parallel to + * be created or, if files, deleted. + * + * The stage returns the list of directories created, and for testing, + * the map of paths to outcomes. + * + * Directory creation can be surprisingly slow against object stores, + * do use benchmarks from real test runs when tuning this algorithm. + */ +public class CreateOutputDirectoriesStage extends + AbstractJobCommitStage<List<TaskManifest>, CreateOutputDirectoriesStage.Result> { + + private static final Logger LOG = LoggerFactory.getLogger( + CreateOutputDirectoriesStage.class); + + /** + * Directories as a map of (path, path). + * Using a map rather than any set for efficient concurrency; the + * concurrent sets don't do lookups so fast. + */ + private final Map<Path, DirMapState> dirMap = new ConcurrentHashMap<>(); + + /** + * A list of created paths for the results. + */ + private final List<Path> createdDirectories = new ArrayList<>(); + + public CreateOutputDirectoriesStage(final StageConfig stageConfig) { + super(false, stageConfig, OP_STAGE_JOB_CREATE_TARGET_DIRS, true); + // add the dest dir to the dir map as we expect the job setup to create it. + dirMap.put(getDestinationDir(), DirMapState.dirWasCreated); + } + + @Override + protected Result executeStage( + final List<TaskManifest> taskManifests) + throws IOException { + + final List<Path> directories = createAllDirectories(taskManifests); + LOG.debug("{}: Created {} directories", getName(), directories.size()); + return new Result(new HashSet<>(directories), dirMap); + } + + /** + * For each task, build the list of directories it wants. + * @param taskManifests task manifests + * @return the list of paths which have been created. + */ + private List<Path> createAllDirectories(final List<TaskManifest> taskManifests) + throws IOException { + + // all directories which need to exist across all + // tasks. + // leaf directories + final Map<Path, DirEntry> leaves = new HashMap<>(); + // parent directories. these do not need to be + // explicitly created. + final Map<Path, DirEntry> parents = new HashMap<>(); + // the files which must be deleted as a directory + // will be created at that path. + final Set<Path> filesToDelete = new HashSet<>(); + + // iterate through the task manifests + // and all output dirs into the set of dirs to + // create. + // hopefully there is a lot of overlap, so the + // final number of dirs to create is small. + for (TaskManifest task: taskManifests) { + final List<DirEntry> destDirectories = task.getDestDirectories(); + Collections.sort(destDirectories, (o1, o2) -> + o1.getLevel() - o2.getLevel()); + for (DirEntry entry: destDirectories) { + // add the dest entry + final Path path = entry.getDestPath(); + if (!leaves.containsKey(path)) { + leaves.put(path, entry); + + // if it is a file to delete, record this. + if (entry.getStatus() == EntryStatus.file) { Review comment: when will this case happen? as we are only getting the directories. ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobCommitStage.java ########## @@ -0,0 +1,942 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations; +import org.apache.hadoop.util.OperationDuration; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.functional.CallableRaisingIOE; +import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.hadoop.util.functional.TaskPool; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker; + +/** + * A Stage in Task/Job Commit. + * A stage can be executed once only, creating the return value of the + * {@link #apply(Object)} method, and, potentially, updating the state of the + * store via {@link ManifestStoreOperations}. + * IOStatistics will also be updated. + * Stages are expected to be combined to form the commit protocol. + * @param <IN> Type of arguments to the stage. + * @param <OUT> Type of result. + */ +public abstract class AbstractJobCommitStage<IN, OUT> + implements JobStage<IN, OUT> { + + private static final Logger LOG = LoggerFactory.getLogger( + AbstractJobCommitStage.class); + + /** + * Error text on rename failure: {@value}. + */ + public static final String FAILED_TO_RENAME_PREFIX = "Failed to "; + + /** + * Is this a task stage? If so, toString() includes task + * info.. + */ + private final boolean isTaskStage; + + /** + * Configuration of all the stages in the ongoing committer + * operation. + */ + private final StageConfig stageConfig; + + /** + * Name of the stage for statistics and logging. + */ + private final String stageStatisticName; + + /** + * Callbacks to update store. + * This is not made visible to the stages; they must + * go through the wrapper classes in this class, which + * add statistics and logging. + */ + private final ManifestStoreOperations operations; + + /** + * Submitter for doing IO against the store. + */ + private final TaskPool.Submitter ioProcessors; + + /** + * Used to stop any re-entrancy of the rename. + * This is an execute-once operation. + */ + private final AtomicBoolean executed = new AtomicBoolean(false); + + /** + * Tracker of the duration of the execution of the stage. + * set after {@link #executeStage(Object)} completes. + */ + private DurationTracker stageExecutionTracker; + + /** + * Name for logging. + */ + private final String name; + + /** + * Constructor. + * @param isTaskStage Is this a task stage? + * @param stageConfig stage-independent configuration. + * @param stageStatisticName name of the stage for statistics/logging + * @param requireIOProcessors are the IO processors required? + */ + protected AbstractJobCommitStage( + final boolean isTaskStage, + final StageConfig stageConfig, + final String stageStatisticName, + final boolean requireIOProcessors) { + this.isTaskStage = isTaskStage; + this.stageStatisticName = stageStatisticName; + this.stageConfig = stageConfig; + requireNonNull(stageConfig.getDestinationDir(), "Destination Directory"); + requireNonNull(stageConfig.getJobId(), "Job ID"); + requireNonNull(stageConfig.getJobAttemptDir(), "Job attempt directory"); + this.operations = requireNonNull(stageConfig.getOperations(), + "Operations callbacks"); + // and the processors of work if required. + this.ioProcessors = bindProcessor( + requireIOProcessors, + stageConfig.getIoProcessors()); + String stageName; + if (isTaskStage) { + // force fast failure. + getRequiredTaskId(); + getRequiredTaskAttemptId(); + getRequiredTaskAttemptDir(); + stageName = String.format("[Task-Attempt %s]", getRequiredTaskAttemptId()); + } else { + stageName = String.format("[Job-Attempt %s/%02d]", + stageConfig.getJobId(), + stageConfig.getJobAttemptNumber()); + } + name = stageName; + } + + /** + * Bind to the processor if it is required. + * @param required is the processor required? + * @param processor processor + * @return the processor binding + * @throws NullPointerException if required == true and processor is null. + */ + private TaskPool.Submitter bindProcessor( + final boolean required, + final TaskPool.Submitter processor) { + return required + ? requireNonNull(processor, "required IO processor is null") + : null; + } + + /** + * Stage entry point. + * Verifies that this is the first and only time the stage is invoked, + * then calls {@link #executeStage(Object)} for the subclass + * to perform its part of the commit protocol. + * The duration of the stage is collected as a statistic, and its + * entry/exit logged at INFO. + * @param arguments arguments to the function. + * @return the result. + * @throws IOException failures. + */ + @Override + public final OUT apply(final IN arguments) throws IOException { + executeOnlyOnce(); + progress(); + String stageName = getStageName(arguments); + getStageConfig().enterStage(stageName); + String statisticName = getStageStatisticName(arguments); + // The tracker here + LOG.info("{}: Executing Stage {}", getName(), stageName); + stageExecutionTracker = createTracker(getIOStatistics(), statisticName); + try { + // exec the input function and return its value + final OUT out = executeStage(arguments); + LOG.info("{}: Stage {} completed after {}", + getName(), + stageName, + OperationDuration.humanTime( + stageExecutionTracker.asDuration().toMillis())); + return out; + } catch (IOException | RuntimeException e) { + LOG.error("{}: Stage {} failed: after {}: {}", + getName(), + stageName, + OperationDuration.humanTime( + stageExecutionTracker.asDuration().toMillis()), + e.toString()); + LOG.debug("{}: Stage failure:", getName(), e); + // input function failed: note it + stageExecutionTracker.failed(); + // and rethrow + throw e; + } finally { + // update the tracker. + // this is called after the catch() call would have + // set the failed flag. + stageExecutionTracker.close(); + progress(); + getStageConfig().exitStage(stageName); + } + } + + /** + * The work of a stage. + * Executed exactly once. + * @param arguments arguments to the function. + * @return the result. + * @throws IOException failures. + */ + protected abstract OUT executeStage(IN arguments) throws IOException; + + /** + * Check that the operation has not been invoked twice. + * This is an atomic check. + * @throws IllegalStateException on a second invocation. + */ + private void executeOnlyOnce() { + Preconditions.checkState( + !executed.getAndSet(true), + "Stage attempted twice"); + } + + /** + * The stage statistic name. + * @param arguments args to the invocation. + * @return stage name. + */ + protected String getStageStatisticName(IN arguments) { + return stageStatisticName; + } + + /** + * Stage name for reporting; defaults to + * call {@link #getStageStatisticName(IN)}. + * @param arguments args to the invocation. + * @return name used in updating reports. + */ + protected String getStageName(IN arguments) { + return getStageStatisticName(arguments); + } + + /** + * Get the execution tracker; non-null + * after stage execution. + * @return a tracker or null. + */ + public DurationTracker getStageExecutionTracker() { + return stageExecutionTracker; + } + + /** + * Adds the duration of the job to an IOStatistics store + * (such as the manifest to be saved). + * @param iostats store + * @param statistic statistic name. + */ + public void addExecutionDurationToStatistics(IOStatisticsStore iostats, + String statistic) { + iostats.addTimedOperation( + statistic, + getStageExecutionTracker().asDuration()); + } + + /** + * Note any rate limiting to the given timing statistic. + * If the wait was 0, no statistics are updated. + * @param statistic statistic key. + * @param wait wait duration. + */ + private void noteAnyRateLimiting(String statistic, Duration wait) { + if (!wait.isZero()) { + // rate limiting took place + getIOStatistics().addTimedOperation( + statistic, + wait.toMillis()); + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "AbstractJobCommitStage{"); + sb.append(isTaskStage ? "Task Stage" : "Job Stage"); + sb.append(" name='").append(name).append('\''); + sb.append(" stage='").append(stageStatisticName).append('\''); + sb.append('}'); + return sb.toString(); + } + + /** + * The stage configuration. + * @return the stage configuration used by this stage. + */ + protected StageConfig getStageConfig() { + return stageConfig; + } + + /** + * Update the thread context with the stage name and + * job ID. + * This MUST be invoked at the start of methods invoked in helper threads, + * to ensure that they are all annotated with job and stage. + * @param stage stage name. + */ + protected void updateAuditContext(final String stage) { + enterStageWorker(stageConfig.getJobId(), stage); + } + + /** + * The IOStatistics are shared across all uses of the + * StageConfig. + * @return the (possibly shared) IOStatistics. + */ + @Override + public final IOStatisticsStore getIOStatistics() { + return stageConfig.getIOStatistics(); + } + + /** + * Call progress() on any Progressable passed in. + */ + protected final void progress() { + if (stageConfig.getProgressable() != null) { + stageConfig.getProgressable().progress(); + } + } + + /** + * Get a file status value or, if the path doesn't exist, return null. + * @param path path + * @return status or null + * @throws IOException IO Failure. + */ + protected final FileStatus getFileStatusOrNull( + final Path path) + throws IOException { + try { + return getFileStatus(path); + } catch (FileNotFoundException e) { + return null; + } + } + + /** + * Get a file status value or, if the path doesn't exist, return null. + * @param path path + * @return status or null + * @throws IOException IO Failure. + */ + protected final FileStatus getFileStatus( + final Path path) + throws IOException { + LOG.trace("{}: getFileStatus('{}')", getName(), path); + requireNonNull(path, + () -> String.format("%s: Null path for getFileStatus() call", getName())); + return trackDuration(getIOStatistics(), OP_GET_FILE_STATUS, () -> + operations.getFileStatus(path)); + } + + /** + * Get a file status value or, if the path doesn't exist, return null. + * @param path path + * @return true if the path resolves to a file + * @throws IOException IO Failure. + */ + protected final boolean isFile( + final Path path) + throws IOException { + LOG.trace("{}: isFile('{}')", getName(), path); + return trackDuration(getIOStatistics(), OP_IS_FILE, () -> { + return operations.isFile(path); + }); + } + + /** + * Delete a path. + * @param path path + * @param recursive recursive delete. + * @return status or null + * @throws IOException IO Failure. + */ + protected final boolean delete( + final Path path, + final boolean recursive) + throws IOException { + LOG.trace("{}: delete('{}, {}')", getName(), path, recursive); + return delete(path, recursive, OP_DELETE); + } + + /** + * Delete a path. + * @param path path + * @param recursive recursive delete. + * @param statistic statistic to update + * @return status or null + * @throws IOException IO Failure. + */ + protected Boolean delete( + final Path path, + final boolean recursive, + final String statistic) + throws IOException { + return trackDuration(getIOStatistics(), statistic, () -> { + return operations.delete(path, recursive); + }); + } + + /** + * Create a directory. + * @param path path + * @param escalateFailure escalate "false" to PathIOE + * @return true if the directory was created/exists. + * @throws IOException IO Failure. + */ + protected final boolean mkdirs( + final Path path, + final boolean escalateFailure) + throws IOException { + LOG.trace("{}: mkdirs('{}')", getName(), path); + return trackDuration(getIOStatistics(), OP_MKDIRS, () -> { + boolean success = operations.mkdirs(path); + if (!success && escalateFailure) { + throw new PathIOException(path.toUri().toString(), + stageStatisticName + ": mkdirs() returned false"); + } + return success; + }); + + } + + /** + * List all directly files under a path. + * Async implementations may under-report their durations. + * @param path path + * @return iterator over the results. + * @throws IOException IO Failure. + */ + protected final RemoteIterator<FileStatus> listStatusIterator( + final Path path) + throws IOException { + LOG.trace("{}: listStatusIterator('{}')", getName(), path); + return trackDuration(getIOStatistics(), OP_LIST_STATUS, () -> + operations.listStatusIterator(path)); + } + + /** + * Load a manifest file. + * @param status source. + * @return the manifest. + * @throws IOException IO Failure. + */ + protected final TaskManifest loadManifest( + final FileStatus status) + throws IOException { + LOG.trace("{}: loadManifest('{}')", getName(), status); + return trackDuration(getIOStatistics(), OP_LOAD_MANIFEST, () -> + operations.loadTaskManifest( + stageConfig.currentManifestSerializer(), + status)); + } + + /** + * List all the manifests in the task manifest dir. + * @return a iterator of manifests. + * @throws IOException IO Failure. + */ + protected final RemoteIterator<FileStatus> listManifests() + throws IOException { + return RemoteIterators.filteringRemoteIterator( + listStatusIterator(getTaskManifestDir()), + st -> st.getPath().toUri().toString().endsWith(MANIFEST_SUFFIX)); + } + + /** + * Make an msync() call; swallow when unsupported. + * @param path path + * @throws IOException IO failure + */ + protected final void msync(Path path) throws IOException { + LOG.trace("{}: msync('{}')", getName(), path); + trackDurationOfInvocation(getIOStatistics(), OP_MSYNC, () -> + operations.msync(path)); + } + + /** + * Create a directory -failing if it exists or if + * mkdirs() failed. + * @param operation operation for error reporting. + * @param path path path to create. + * @return the path. + * @throws IOException failure + * @throws PathIOException mkdirs failed. + * @throws FileAlreadyExistsException destination exists. + */ + protected final Path createNewDirectory( + final String operation, + final Path path) throws IOException { + LOG.trace("{}: {} createNewDirectory('{}')", getName(), operation, path); + requireNonNull(path, + () -> String.format("%s: Null path for operation %s", getName(), operation)); + // check for dir existence before trying to create. + try { + final FileStatus status = getFileStatus(path); + // no exception, so the path exists. + throw new FileAlreadyExistsException(operation + + ": path " + path + + " already exists and has status " + status); + } catch (FileNotFoundException e) { + // the path does not exist, so create it. + mkdirs(path, true); + return path; + } + } + + /** + * Assert that a path is a directory which must exist. + * @param operation operation for error reporting. + * @param path path path to create. + * @return the path + * @throws IOException failure + * @throws PathIOException mkdirs failed. + * @throws FileAlreadyExistsException destination exists. + */ + protected final Path directoryMustExist( + final String operation, + final Path path) throws IOException { + final FileStatus status = getFileStatus(path); + if (!status.isDirectory()) { + throw new PathIOException(path.toString(), + operation + + ": Path is not a directory; its status is :" + status); + } + return path; + } + + /** + * Save a task manifest or summary. This will be done by + * writing to a temp path and then renaming. + * If the destination path exists: Delete it. + * @param manifestData the manifest/success file + * @param tempPath temp path for the initial save + * @param finalPath final path for rename. + * @throws IOException failure to load/parse + */ + @SuppressWarnings("unchecked") + protected final <T extends AbstractManifestData> void save(T manifestData, + final Path tempPath, + final Path finalPath) throws IOException { + LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath, finalPath); + trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () -> + operations.save(manifestData, tempPath, true)); + renameFile(tempPath, finalPath); + } + + /** + * Get an etag from a FileStatus which MUST BE + * an implementation of EtagSource and + * whose etag MUST NOT BE null/empty. + * @param status the status; may be null. + * @return the etag or null if not provided + */ + public String getEtag(FileStatus status) { + return operations.getEtag(status); + } + + /** + * Rename a file from source to dest; if the underlying FS API call + * returned false that's escalated to an IOE. + * @param source source file. + * @param dest dest file + * @throws IOException failure + * @throws PathIOException if the rename() call returned false. + */ + protected final void renameFile(final Path source, final Path dest) + throws IOException { + maybeDeleteDest(true, dest); + executeRenamingOperation("renameFile", source, dest, + OP_RENAME_FILE, () -> + operations.renameFile(source, dest)); + } + + /** + * Rename a file from source to dest; if the underlying FS API call + * returned false that's escalated to an IOE. + * @param source source file. + * @param dest dest file + * @throws IOException failure + * @throws PathIOException if the rename() call returned false. + */ + protected final void renameDir(final Path source, final Path dest) + throws IOException { + + maybeDeleteDest(true, dest); + executeRenamingOperation("renameDir", source, dest, + OP_RENAME_FILE, () -> + operations.renameDir(source, dest) + ); + } + + /** + * Commit a file from the manifest using rename or, if available, resilient renaming. + * @param entry entry from manifest + * @throws PathIOException if the rename() call returned false and was uprated. + * @throws IOException failure + */ + protected final CommitOutcome commitFile(FileEntry entry, + boolean deleteDest) + throws IOException { + + final Path source = entry.getSourcePath(); + final Path dest = entry.getDestPath(); + + maybeDeleteDest(deleteDest, dest); + if (storeSupportsResilientCommit()) { + // get the commit permits + final ManifestStoreOperations.CommitFileResult result = trackDuration(getIOStatistics(), + OP_COMMIT_FILE_RENAME, () -> + operations.commitFile(entry)); + if (result.recovered()) { + // recovery took place. + getIOStatistics().incrementCounter(OP_COMMIT_FILE_RENAME_RECOVERED); + } + if (result.getWaitTime() != null) { + // note any delay which took place + noteAnyRateLimiting(STORE_IO_RATE_LIMITED, result.getWaitTime()); + } + } else { + // commit with a simple rename; failures will be escalated. + executeRenamingOperation("renameFile", source, dest, + OP_COMMIT_FILE_RENAME, () -> + operations.renameFile(source, dest)); + } + return new CommitOutcome(); Review comment: nit: CommitOutcome is empty only. Why not just void? ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobCommitStage.java ########## @@ -0,0 +1,1009 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations; +import org.apache.hadoop.util.OperationDuration; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.functional.CallableRaisingIOE; +import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.hadoop.util.functional.TaskPool; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_RENAME; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.IO_ACQUIRE_READ_PERMIT_BLOCKED; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.IO_ACQUIRE_WRITE_PERMIT_BLOCKED; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_GET_FILE_STATUS; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_LIST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_OPEN_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_COMMIT_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_CREATE_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_DELETE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_MKDIR; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_RENAME; + +/** + * A Stage in Task/Job Commit. + * A stage can be executed once only, creating the return value of the + * {@link #apply(Object)} method, and, potentially, updating the state of the + * store via {@link StoreOperations}. + * IOStatistics will also be updated. + * Stages are expected to be combined to form the commit protocol. + * @param <IN> Type of arguments to the stage. + * @param <OUT> Type of result. + */ +public abstract class AbstractJobCommitStage<IN, OUT> + implements JobStage<IN, OUT> { + + private static final Logger LOG = LoggerFactory.getLogger( + AbstractJobCommitStage.class); + + /** + * Error text on rename failure: {@value}. + */ + public static final String FAILED_TO_RENAME = "Failed to "; + + /** + * Error text on when clean up is to trash, but + * the FS has trash disabled: {@value}. + */ + public static final String E_TRASH_DISABLED = "Trash is disabled"; + + /** + * Is this a task stage? If so, toString() includes task + * info.. + */ + private final boolean isTaskStage; + + /** + * Configuration of all the stages in the ongoing committer + * operation. + */ + private final StageConfig stageConfig; + + /** + * Name of the stage for statistics and logging. + */ + private final String stageStatisticName; + + /** + * Callbacks to update store. + * This is not made visible to the stages; they must + * go through the wrapper classes in this class, which + * add statistics and logging. + */ + private final StoreOperations operations; + + /** + * Submitter for doing IO against the store. + */ + private final TaskPool.Submitter ioProcessors; + + /** + * Used to stop any re-entrancy of the rename. + * This is an execute-once operation. + */ + private final AtomicBoolean executed = new AtomicBoolean(false); + + /** + * Tracker of the duration of the execution of the stage. + * set after {@link #executeStage(Object)} completes. + */ + private DurationTracker stageExecutionTracker; + + /** + * Name for logging. + */ + private final String name; + + /** + * Constructor. + * @param isTaskStage Is this a task stage? + * @param stageConfig stage-independent configuration. + * @param stageStatisticName name of the stage for statistics/logging + * @param requireIOProcessors are the IO processors required? + */ + protected AbstractJobCommitStage( + final boolean isTaskStage, + final StageConfig stageConfig, + final String stageStatisticName, + final boolean requireIOProcessors) { + this.isTaskStage = isTaskStage; + this.stageStatisticName = stageStatisticName; + this.stageConfig = stageConfig; + requireNonNull(stageConfig.getDestinationDir(), "Destination Directory"); + requireNonNull(stageConfig.getJobId(), "Job ID"); + requireNonNull(stageConfig.getJobAttemptDir(), "Job attempt directory"); + this.operations = requireNonNull(stageConfig.getOperations(), + "Operations callbacks"); + // and the processors of work if required. + this.ioProcessors = bindProcessor( + requireIOProcessors, + stageConfig.getIoProcessors()); + String stageName; + if (isTaskStage) { + // force fast failure. + getRequiredTaskId(); + getRequiredTaskAttemptId(); + getRequiredTaskAttemptDir(); + stageName = String.format("[Task-Attempt %s]", getRequiredTaskAttemptId()); + } else { + stageName = String.format("[Job-Attempt %s/%02d]", + stageConfig.getJobId(), + stageConfig.getJobAttemptNumber()); + } + name = stageName; + } + + /** + * Bind to the processor if it is required. + * @param required is the processor required? + * @param processor processor + * @return the processor binding + * @throws NullPointerException if required == true and processor is null. + */ + private TaskPool.Submitter bindProcessor( + final boolean required, + final TaskPool.Submitter processor) { + return required + ? requireNonNull(processor, "required IO processor is null") + : null; + } + + /** + * Stage entry point. + * Verifies that this is the first and only time the stage is invoked, + * then calls {@link #executeStage(Object)} for the subclass + * to perform its part of the commit protocol. + * The duration of the stage is collected as a statistic, and its + * entry/exit logged at INFO. + * @param arguments arguments to the function. + * @return the result. + * @throws IOException failures. + */ + @Override + public final OUT apply(final IN arguments) throws IOException { + executeOnlyOnce(); + progress(); + String stageName = getStageName(arguments); + getStageConfig().enterStage(stageName); + String statisticName = getStageStatisticName(arguments); + // The tracker here + LOG.info("{}: Executing Stage {}", getName(), stageName); + stageExecutionTracker = createTracker(getIOStatistics(), statisticName); + try { + // exec the input function and return its value + final OUT out = executeStage(arguments); + LOG.info("{}: Stage {} completed after {}", + getName(), + stageName, + OperationDuration.humanTime( + stageExecutionTracker.asDuration().toMillis())); + return out; + } catch (IOException | RuntimeException e) { + LOG.error("{}: Stage {} failed: after {}: {}", + getName(), + stageName, + OperationDuration.humanTime( + stageExecutionTracker.asDuration().toMillis()), + e.toString()); + LOG.debug("{}: Stage failure:", getName(), e); + // input function failed: note it + stageExecutionTracker.failed(); + // and rethrow + throw e; + } finally { + // update the tracker. + // this is called after the catch() call would have + // set the failed flag. + stageExecutionTracker.close(); + progress(); + getStageConfig().exitStage(stageName); + } + } + + /** + * The work of a stage. + * Executed exactly once. + * @param arguments arguments to the function. + * @return the result. + * @throws IOException failures. + */ + protected abstract OUT executeStage(IN arguments) throws IOException; + + /** + * Check that the operation has not been invoked twice. + * This is an atomic check. + * @throws IllegalStateException on a second invocation. + */ + private void executeOnlyOnce() { + Preconditions.checkState( + !executed.getAndSet(true), + "Stage attempted twice"); + } + + /** + * The stage statistic name. + * @param arguments args to the invocation. + * @return stage name. + */ + protected String getStageStatisticName(IN arguments) { + return stageStatisticName; + } + + /** + * Stage name for reporting; defaults to + * call {@link #getStageStatisticName(IN)}. + * @param arguments args to the invocation. + * @return name used in updating reports. + */ + protected String getStageName(IN arguments) { + return getStageStatisticName(arguments); + } + + /** + * Get the execution tracker; non-null + * after stage execution. + * @return a tracker or null. + */ + public DurationTracker getStageExecutionTracker() { + return stageExecutionTracker; + } + + /** + * Adds the duration of the job to an IOStatistics store + * (such as the manifest to be saved). + * @param iostats store + * @param statistic statistic name. + */ + public void addExecutionDurationToStatistics(IOStatisticsStore iostats, + String statistic) { + iostats.addTimedOperation( + statistic, + getStageExecutionTracker().asDuration()); + } + + /** + * Acquire a given number of read permits. + * The subsequent caller will block if the rate + * limiter mandates it. + * no-op if (in test setups) there's no rate limiter. + * Acquisition duration is added to stats when rate-limited. + * @param permits permit count. + */ + public void acquireReadPermits(int permits) { + noteAnyRateLimiting(IO_ACQUIRE_READ_PERMIT_BLOCKED, + stageConfig.acquireReadPermits(permits)); + } + + /** + * Acquire a given number of write permits. + * The subsequent caller will block if the rate + * limiter mandates it. + * no-op if (in test setups) there's no rate limiter. + * Acquisition duration is added to stats when rate-limited. + * @param permits permit count. + */ + public void acquireWritePermits(int permits) { + noteAnyRateLimiting(IO_ACQUIRE_WRITE_PERMIT_BLOCKED, + stageConfig.acquireWritePermits(permits)); + } + + /** + * Note any rate limiting to the given timing statistic. + * If the wait was 0, no statistics are updated. + * @param statistic statistic key. + * @param wait wait time in seconds. + */ + private void noteAnyRateLimiting(String statistic, int wait) { + if (wait > 0) { + // rate limiting took place + getIOStatistics().addTimedOperation( + statistic, + wait); + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "AbstractJobCommitStage{"); + sb.append(isTaskStage ? "Task Stage" : "Job Stage"); + sb.append(" name='").append(name).append('\''); + sb.append(" stage='").append(stageStatisticName).append('\''); + sb.append('}'); + return sb.toString(); + } + + /** + * The stage configuration. + * @return the stage configuration used by this stage. + */ + protected StageConfig getStageConfig() { + return stageConfig; + } + + /** + * Update the thread context with the stage name and + * job ID. + * This MUST be invoked at the start of methods invoked in helper threads, + * to ensure that they are all annotated with job and stage. + * @param stage stage name. + */ + protected void updateAuditContext(final String stage) { + enterStageWorker(stageConfig.getJobId(), stage); + } + + /** + * The IOStatistics are shared across all uses of the + * StageConfig. + * @return the (possibly shared) IOStatistics. + */ + @Override + public final IOStatisticsStore getIOStatistics() { + return stageConfig.getIOStatistics(); + } + + /** + * Call progress() on any Progressable passed in. + */ + protected final void progress() { + if (stageConfig.getProgressable() != null) { + stageConfig.getProgressable().progress(); + } + } + + /** + * Get a file status value or, if the path doesn't exist, return null. + * @param path path + * @return status or null + * @throws IOException IO Failure. + */ + protected final FileStatus getFileStatusOrNull( + final Path path) + throws IOException { + try { + return getFileStatus(path); + } catch (FileNotFoundException e) { + return null; + } + } + + /** + * Get a file status value or, if the path doesn't exist, return null. + * @param path path + * @return status or null + * @throws IOException IO Failure. + */ + protected final FileStatus getFileStatus( + final Path path) + throws IOException { + LOG.trace("{}: getFileStatus('{}')", getName(), path); + requireNonNull(path, + () -> String.format("%s: Null path for getFileStatus() call", getName())); + acquireReadPermits(PERMIT_READ_GET_FILE_STATUS); + return trackDuration(getIOStatistics(), OP_GET_FILE_STATUS, () -> + operations.getFileStatus(path)); + } + + /** + * Get a file status value or, if the path doesn't exist, return null. + * @param path path + * @return true if the path resolves to a file + * @throws IOException IO Failure. + */ + protected final boolean isFile( + final Path path) + throws IOException { + LOG.trace("{}: isFile('{}')", getName(), path); + return trackDuration(getIOStatistics(), OP_IS_FILE, () -> { + acquireReadPermits(PERMIT_READ_GET_FILE_STATUS); + return operations.isFile(path); + }); + } + + /** + * Delete a path. + * @param path path + * @param recursive recursive delete. + * @return status or null + * @throws IOException IO Failure. + */ + protected final boolean delete( + final Path path, + final boolean recursive) + throws IOException { + LOG.trace("{}: delete('{}, {}')", getName(), path, recursive); + return delete(path, recursive, OP_DELETE); + } + + /** + * Delete a path. + * @param path path + * @param recursive recursive delete. + * @param statistic statistic to update + * @return status or null + * @throws IOException IO Failure. + */ + protected Boolean delete( Review comment: I think it is just refactoring and delete is called from multiple Stages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 742137) Time Spent: 36h 50m (was: 36h 40m) > Add a task-manifest output committer for Azure and GCS > ------------------------------------------------------ > > Key: MAPREDUCE-7341 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7341 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: client > Affects Versions: 3.3.1 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > Time Spent: 36h 50m > Remaining Estimate: 0h > > Add a task-manifest output committer for Azure and GCS > The S3A committers are very popular in Spark on S3, as they are both correct > and fast. > The classic FileOutputCommitter v1 and v2 algorithms are all that is > available for Azure ABFS and Google GCS, and they have limitations. > The v2 algorithm isn't safe in the presence of failed task attempt commits, > so we > recommend the v1 algorithm for Azure. But that is slow because it > sequentially lists > then renames files and directories, one-by-one. The latencies of list > and rename make things slow. > Google GCS lacks the atomic directory rename required for v1 correctness; > v2 can be used (which doesn't have the job commit performance limitations), > but it's not safe. > Proposed > * Add a new FileOutputFormat committer which uses an intermediate manifest to > pass the list of files created by a TA to the job committer. > * Job committer to parallelise reading these task manifests and submit all the > rename operations into a pool of worker threads. (also: mkdir, directory > deletions on cleanup) > * Use the committer plugin mechanism added for s3a to make this the default > committer for ABFS > (i.e. no need to make any changes to FileOutputCommitter) > * Add lots of IOStatistics instrumentation + logging of operations in the > JobCommit > for visibility of where delays are occurring. > * Reuse the S3A committer _SUCCESS JSON structure to publish IOStats & other > data > for testing/support. > This committer will be faster than the V1 algorithm because of the > parallelisation, and > because a manifest written by create-and-rename will be exclusive to a single > task > attempt, delivers the isolation which the v2 committer lacks. > This is not an attempt to do an iceberg/hudi/delta-lake style manifest-only > format > for describing the contents of a table; the final output is still a directory > tree > which must be scanned during query planning. > As such the format is still suboptimal for cloud storage -but at least we > will have > faster job execution during the commit phases. > > Note: this will also work on HDFS, where again, it should be faster than > the v1 committer. However the target is very much Spark with ABFS and GCS; no > plans to worry about MR as that simplifies the challenge of dealing with job > restart (i.e. you don't have to) -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org