[ https://issues.apache.org/jira/browse/MAPREDUCE-7341?focusedWorklogId=742298&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-742298 ]
ASF GitHub Bot logged work on MAPREDUCE-7341: --------------------------------------------- Author: ASF GitHub Bot Created on: 16/Mar/22 13:11 Start Date: 16/Mar/22 13:11 Worklog Time Spent: 10m Work Description: steveloughran commented on a change in pull request #2971: URL: https://github.com/apache/hadoop/pull/2971#discussion_r827991594 ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCreateOutputDirectoriesStage.java ########## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.EntryStatus; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CreateOutputDirectoriesStage; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig; +import org.apache.hadoop.util.Lists; + +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_FILE_UNDER_DESTINATION; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_PREPARE_DIR_ANCESTORS; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test directory creation. + * As the directory creation phase relies on input from the task manifest to + * determine which directories to explicitly create, delete files at + * etc, these tests build up manifests and assert that the output + * of the directory creation stage matches that of the combination + * of the manifest and the filesystem state. + */ +public class TestCreateOutputDirectoriesStage extends AbstractManifestCommitterTest { + + /** + * Deep tree width, subclasses (including in external projects) + * may change. + */ + protected static final int DEEP_TREE_WIDTH = 4; + + /** + * The number of directories created in test setup; this must be + * added to all assertions of the value of OP_MKDIRS. + */ + private static final int DIRECTORIES_CREATED_IN_SETUP = 2; + + private Path destDir; + private CreateOutputDirectoriesStage mkdirStage; + private StageConfig stageConfig; + private IOStatisticsStore iostats; + + @Override + public void setup() throws Exception { + super.setup(); + destDir = methodPath(); + // clean up dest dir completely + destDir.getFileSystem(getConfiguration()).delete(destDir, true); + setStoreOperations(createManifestStoreOperations()); + stageConfig = createStageConfigForJob(JOB1, destDir) + .withDeleteTargetPaths(true); + setJobStageConfig(stageConfig); + // creates the job directories. + new SetupJobStage(stageConfig).apply(true); + mkdirStage = new CreateOutputDirectoriesStage(stageConfig); + iostats = stageConfig.getIOStatistics(); + // assert original count of dirs created == 2 : job and task manifest + verifyStatisticCounterValue(iostats, OP_MKDIRS, + DIRECTORIES_CREATED_IN_SETUP); + // reset the value to simplify future work + iostats.getCounterReference(OP_MKDIRS).set(0); + } + + @Test + public void testPrepareSomeDirs() throws Throwable { + + final long initialFileStatusCount = lookupCounterStatistic(iostats, OP_GET_FILE_STATUS); + final int dirCount = 8; + final List<Path> dirs = subpaths(destDir, dirCount); + final List<DirEntry> dirEntries = dirEntries(dirs, 1, EntryStatus.not_found); + + // two manifests with duplicate entries + final List<TaskManifest> manifests = Lists.newArrayList( + manifestWithDirsToCreate(dirEntries), + manifestWithDirsToCreate(dirEntries)); + final CreateOutputDirectoriesStage.Result result = mkdirStage.apply(manifests); + Assertions.assertThat(result.getCreatedDirectories()) + .describedAs("output of %s", mkdirStage) + .containsExactlyInAnyOrderElementsOf(dirs); + + LOG.info("Job Statistics\n{}", ioStatisticsToPrettyString(iostats)); + + // now dirCount new dirs are added. + verifyStatisticCounterValue(iostats, OP_MKDIRS, dirCount); + + // now rerun the same preparation sequence, but this + // time declare that the directories exist (as they do) + final CreateOutputDirectoriesStage s2 = + new CreateOutputDirectoriesStage(stageConfig); + final CreateOutputDirectoriesStage.Result r2 = s2.apply( + Lists.newArrayList( + manifestWithDirsToCreate(dirEntries(dirs, 1, EntryStatus.dir)))); + + // no directories are now created. + Assertions.assertThat(r2.getCreatedDirectories()) + .describedAs("output of %s", s2) + .isEmpty(); + LOG.info("Job Statistics after second pass\n{}", ioStatisticsToPrettyString(iostats)); + + // second run probed no dest dirs + verifyStatisticCounterValue(iostats, OP_GET_FILE_STATUS, initialFileStatusCount); + // and no new mkdir calls were made + verifyStatisticCounterValue(iostats, OP_MKDIRS, dirCount); + verifyStatisticCounterValue(iostats, OP_DELETE_FILE_UNDER_DESTINATION, 0); + verifyStatisticCounterValue(iostats, OP_IS_FILE, 0); + } + + /** + * Given a list of paths, build a list of DirEntry entries. + * @param paths list of paths + * @param level Level in the treewalk. + * @param entryStatus status of dirs + * @return list of entries with the given level and entry status. + */ + protected List<DirEntry> dirEntries(Collection<Path> paths, + int level, + EntryStatus entryStatus) { + return paths.stream() + .map(p -> DirEntry.dirEntry(p, entryStatus, level)) + .collect(Collectors.toList()); + } + + /** + * Create a manifest with the list of directory entries added. + * Job commit requires the entries to have been probed for, and + * for the entire tree under the dest path to be included. + * @param dirEntries list of directory entries. + * @return the manifest. + */ + protected TaskManifest manifestWithDirsToCreate(List<DirEntry> dirEntries) { + final TaskManifest taskManifest = new TaskManifest(); + taskManifest.getDestDirectories().addAll(dirEntries); + return taskManifest; + } + + /** + * Assert the directory map status of a path. + * @param result stage result + * @param path path to look up + * @param expected expected value. + */ + private static void assertDirMapStatus( + CreateOutputDirectoriesStage.Result result, + Path path, + CreateOutputDirectoriesStage.DirMapState expected) { + Assertions.assertThat(result.getDirMap()) + .describedAs("Directory Map entry for %s", path) + .isNotNull() + .containsKey(path) + .containsEntry(path, expected); + } + + /** + * Prepare a deep tree {@code c ^ 3} of entries. Review comment: yes. says "use a fixed font" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 742298) Time Spent: 37h 50m (was: 37h 40m) > Add a task-manifest output committer for Azure and GCS > ------------------------------------------------------ > > Key: MAPREDUCE-7341 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7341 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: client > Affects Versions: 3.3.1 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > Time Spent: 37h 50m > Remaining Estimate: 0h > > Add a task-manifest output committer for Azure and GCS > The S3A committers are very popular in Spark on S3, as they are both correct > and fast. > The classic FileOutputCommitter v1 and v2 algorithms are all that is > available for Azure ABFS and Google GCS, and they have limitations. > The v2 algorithm isn't safe in the presence of failed task attempt commits, > so we > recommend the v1 algorithm for Azure. But that is slow because it > sequentially lists > then renames files and directories, one-by-one. The latencies of list > and rename make things slow. > Google GCS lacks the atomic directory rename required for v1 correctness; > v2 can be used (which doesn't have the job commit performance limitations), > but it's not safe. > Proposed > * Add a new FileOutputFormat committer which uses an intermediate manifest to > pass the list of files created by a TA to the job committer. > * Job committer to parallelise reading these task manifests and submit all the > rename operations into a pool of worker threads. (also: mkdir, directory > deletions on cleanup) > * Use the committer plugin mechanism added for s3a to make this the default > committer for ABFS > (i.e. no need to make any changes to FileOutputCommitter) > * Add lots of IOStatistics instrumentation + logging of operations in the > JobCommit > for visibility of where delays are occurring. > * Reuse the S3A committer _SUCCESS JSON structure to publish IOStats & other > data > for testing/support. > This committer will be faster than the V1 algorithm because of the > parallelisation, and > because a manifest written by create-and-rename will be exclusive to a single > task > attempt, delivers the isolation which the v2 committer lacks. > This is not an attempt to do an iceberg/hudi/delta-lake style manifest-only > format > for describing the contents of a table; the final output is still a directory > tree > which must be scanned during query planning. > As such the format is still suboptimal for cloud storage -but at least we > will have > faster job execution during the commit phases. > > Note: this will also work on HDFS, where again, it should be faster than > the v1 committer. However the target is very much Spark with ABFS and GCS; no > plans to worry about MR as that simplifies the challenge of dealing with job > restart (i.e. you don't have to) -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org