[ https://issues.apache.org/jira/browse/MAPREDUCE-7341?focusedWorklogId=741527&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-741527 ]
ASF GitHub Bot logged work on MAPREDUCE-7341: --------------------------------------------- Author: ASF GitHub Bot Created on: 15/Mar/22 10:35 Start Date: 15/Mar/22 10:35 Worklog Time Spent: 10m Work Description: mukund-thakur commented on a change in pull request #2971: URL: https://github.com/apache/hadoop/pull/2971#discussion_r825692207 ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md ########## @@ -0,0 +1,335 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + + +# Manifest Committer Architecture + +This document describes the architecture and other implementation/correctness +aspects of the [Manifest Committer](manifest_committer.html) + +The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html). +<!-- MACRO{toc|fromDepth=0|toDepth=2} --> + +The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world" +queries, and performance and correctness on GCS. + +This committer uses the extension point which came in for the S3A committers. +Users can declare a new committer factory for `abfs://` and `gcs://` URLs. +It can be used through Hadoop MapReduce and Apache Spark. + +## Background + +### Terminology + +| Term | Meaning| +|------|--------| +| Committer | A class which can be invoked by MR/Spark to perform the task and job commit operations. | +| Spark Driver | The spark process scheduling the work and choreographing the commit operation.| +| Job | In MapReduce. the entire application. In spark, this is a single stage in a chain of work | +| Job Attempt | A single attempt at a job. MR supports multiple Job attempts with recovery on partial job failure. Spark says "start again from scratch" | +| Task | a subsection of a job, such as processing one file, or one part of a file | +| Task ID | ID of the task, unique within this job. Usually starts at 0 and is used in filenames (part-0000, part-001, etc.) | +| Task attempt (TA) | An attempt to perform a task. It may fail, in which case MR/spark will schedule another. | +| Task Attempt ID | A unique ID for the task attempt. The Task ID + an attempt counter.| +| Destination directory | The final destination of work.| Review comment: nit: extra space in the start. ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterFactory.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; + +/** + * This is the committer factory to register as the source of committers + * for the job/filesystem schema. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ManifestCommitterFactory extends PathOutputCommitterFactory { + + public static final Logger LOG = LoggerFactory.getLogger( + ManifestCommitterFactory.class); + + /** + * Name of this factory. + */ + public static final String NAME = ManifestCommitterFactory.class.getName(); + + @Override + public ManifestCommitter createOutputCommitter(final Path outputPath, + final TaskAttemptContext context) throws IOException { + // safety check. S3A does not support this, so fail fast. + if ("s3a".equals(outputPath.toUri().getScheme())) { Review comment: nit: hardcoding the s3a scheme? ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md ########## @@ -0,0 +1,335 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + + +# Manifest Committer Architecture + +This document describes the architecture and other implementation/correctness +aspects of the [Manifest Committer](manifest_committer.html) + +The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html). +<!-- MACRO{toc|fromDepth=0|toDepth=2} --> + +The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world" +queries, and performance and correctness on GCS. + +This committer uses the extension point which came in for the S3A committers. +Users can declare a new committer factory for `abfs://` and `gcs://` URLs. +It can be used through Hadoop MapReduce and Apache Spark. + +## Background + +### Terminology + +| Term | Meaning| +|------|--------| +| Committer | A class which can be invoked by MR/Spark to perform the task and job commit operations. | +| Spark Driver | The spark process scheduling the work and choreographing the commit operation.| +| Job | In MapReduce. the entire application. In spark, this is a single stage in a chain of work | +| Job Attempt | A single attempt at a job. MR supports multiple Job attempts with recovery on partial job failure. Spark says "start again from scratch" | +| Task | a subsection of a job, such as processing one file, or one part of a file | +| Task ID | ID of the task, unique within this job. Usually starts at 0 and is used in filenames (part-0000, part-001, etc.) | +| Task attempt (TA) | An attempt to perform a task. It may fail, in which case MR/spark will schedule another. | +| Task Attempt ID | A unique ID for the task attempt. The Task ID + an attempt counter.| +| Destination directory | The final destination of work.| +| Job Attempt Directory | A temporary directory used by the job attempt. This is always _underneath_ the destination directory, so as to ensure it is in the same encryption zone as HDFS, storage volume in other filesystems, etc.| +| Task Attempt directory | (also known as "Task Attempt Working Directory"). Directory exclusive for each task attempt under which files are written | +| Task Commit | Taking the output of a Task Attempt and making it the final/exclusive result of that "successful" Task.| +| Job Commit | aggregating all the outputs of all committed tasks and producing the final results of the job. | + + + +The purpose of a committer is to ensure that the complete output of +a job ends up in the destination, even in the presence of failures of tasks. + +- _Complete:_ the output includes the work of all successful tasks. +- _Exclusive:_ the output of unsuccessful tasks is not present. +- _Concurrent:_ When multiple tasks are committed in parallel the output is the same as when + the task commits are serialized. This is not a requirement of Job Commit. +- _Abortable:_ jobs and tasks may be aborted prior to job commit, after which their output is not visible. +- _Continuity of correctness:_ once a job is committed, the output of any failed, + aborted, or unsuccessful task MUST NO appear at some point in the future. + +For Hive's classic hierarchical-directory-structured tables, job committing +requires the output of all committed tasks to be put into the correct location +in the directory tree. + +The committer built into `hadoop-mapreduce-client-core` module is the `FileOutputCommitter`. + + + +## The Manifest Committer: A high performance committer for Spark on Azure and Google storage. + +The Manifest Committera higher performance committer for ABFS and GCS storage Review comment: typo: Committera-> Committer ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md ########## @@ -0,0 +1,335 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + + +# Manifest Committer Architecture + +This document describes the architecture and other implementation/correctness +aspects of the [Manifest Committer](manifest_committer.html) + +The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html). +<!-- MACRO{toc|fromDepth=0|toDepth=2} --> + +The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world" +queries, and performance and correctness on GCS. + +This committer uses the extension point which came in for the S3A committers. +Users can declare a new committer factory for `abfs://` and `gcs://` URLs. +It can be used through Hadoop MapReduce and Apache Spark. + +## Background + +### Terminology + +| Term | Meaning| +|------|--------| +| Committer | A class which can be invoked by MR/Spark to perform the task and job commit operations. | +| Spark Driver | The spark process scheduling the work and choreographing the commit operation.| +| Job | In MapReduce. the entire application. In spark, this is a single stage in a chain of work | +| Job Attempt | A single attempt at a job. MR supports multiple Job attempts with recovery on partial job failure. Spark says "start again from scratch" | +| Task | a subsection of a job, such as processing one file, or one part of a file | +| Task ID | ID of the task, unique within this job. Usually starts at 0 and is used in filenames (part-0000, part-001, etc.) | +| Task attempt (TA) | An attempt to perform a task. It may fail, in which case MR/spark will schedule another. | +| Task Attempt ID | A unique ID for the task attempt. The Task ID + an attempt counter.| +| Destination directory | The final destination of work.| +| Job Attempt Directory | A temporary directory used by the job attempt. This is always _underneath_ the destination directory, so as to ensure it is in the same encryption zone as HDFS, storage volume in other filesystems, etc.| +| Task Attempt directory | (also known as "Task Attempt Working Directory"). Directory exclusive for each task attempt under which files are written | +| Task Commit | Taking the output of a Task Attempt and making it the final/exclusive result of that "successful" Task.| +| Job Commit | aggregating all the outputs of all committed tasks and producing the final results of the job. | + + + +The purpose of a committer is to ensure that the complete output of +a job ends up in the destination, even in the presence of failures of tasks. + +- _Complete:_ the output includes the work of all successful tasks. +- _Exclusive:_ the output of unsuccessful tasks is not present. +- _Concurrent:_ When multiple tasks are committed in parallel the output is the same as when + the task commits are serialized. This is not a requirement of Job Commit. +- _Abortable:_ jobs and tasks may be aborted prior to job commit, after which their output is not visible. +- _Continuity of correctness:_ once a job is committed, the output of any failed, + aborted, or unsuccessful task MUST NO appear at some point in the future. + +For Hive's classic hierarchical-directory-structured tables, job committing +requires the output of all committed tasks to be put into the correct location +in the directory tree. + +The committer built into `hadoop-mapreduce-client-core` module is the `FileOutputCommitter`. + + + +## The Manifest Committer: A high performance committer for Spark on Azure and Google storage. + +The Manifest Committera higher performance committer for ABFS and GCS storage +for jobs which create file across deep directory trees through many tasks. + +It will also work on `hdfs://` and indeed, `file://` URLs, but +it is optimized to address listing and renaming performance and throttling +issues in cloud storage. + +It *will not* work correctly with S3, because it relies on an atomic rename-no-overwrite +operation to commit the manifest file. It will also have the performance +problems of copying rather than moving all the generated data. + +Although it will work with MapReduce +there is no handling of multiple job attempts with recovery from previous failed +attempts. (Plan: fail on MR AM restart) + +### The Manifest + +A Manifest file is designed which contains (along with IOStatistics and some +other things) + +1. A list of destination directories which must be created if they do not exist. +1. A list of files to rename, recorded as (absolute source, absolute destination, + file-size) entries. + +### Task Commit + +Task attempts are committed by: + +1. Recursively listing the task attempt working dir to build + 1. A list of directories under which files are renamed. + 2. A list of files to rename: source, destination, size and optionally, etag. +2. Saving this information in a manifest file in the job attempt directory with + a filename derived from the Task ID. + Note: writing to a temp file and then renaming to the final path will be used + to ensure the manifest creation is atomic. + + +No renaming takes place —the files are left in their original location. + +The directory treewalk is single-threaded, then it is `O(directories)`, +with each directory listing using one or more paged LIST calls. + +This is simple, and for most tasks, the scan is off the critical path of of the job. + +Statistics analysis may justify moving to parallel scans in future. + + +### Job Commit + +Job Commit consists of: + +1. List all manifest files in the job attempt directory. +1. Load each manifest file, create directories which do not yet exist, then + rename each file in the rename list. +1. Save a JSON `_SUCCESS` file with the same format as the S3A committer (for + testing; use write and rename for atomic save) + +The job commit phase supports parallelization for many tasks and many files +per task, specifically: + +1. Manifest tasks are loaded and processed in a pool of "manifest processor" + threads. +2. Directory creation and file rename operations are each processed in a pool of " + executor" threads: many renames can execute in parallel as they use minimal + network IO. +3. job cleanup can parallelize deletion of task attempt directories. This + is relevant as directory deletion is `O(files)` on Google cloud storage, + and also on ABFS when OAuth authentication is used. + + +### Ancestor directory preparation + +Optional scan of all ancestors ...if any are files, delete. + + +### Parent directory creation + +1. Probe shared directory map for directory existing. If found: operation is + complete. +1. if the map is empty, call `getFileStatus()` on the path. Not found: create + directory, add entry and those of all parent paths Found and is directory: + add entry and those of all parent paths Found and is file: delete. then + create as before. + +Efficiently handling concurrent creation of directories (or delete+create) is going to be a +troublespot; some effort is invested there to build the set of directories to +create. + +### File Rename + +Files are renamed in parallel. + +A pre-rename check for anything being at that path (and deleting it) will be optional. +With spark creating new UUIDs for each file, this isn't going to happen, and +saves HTTP requests. + + +### Validation + +Optional scan of all committed files and verify length and, if known, +etag. For testing and diagnostics. + +## Benefits + +* Pushes the source tree list operations into the task commit phase, which is + generally off the critical path of execution +* Provides an atomic task commit to GCS, as there is no expectation that + directory rename is atomic +* It is possible to pass IOStatistics from workers in manifest. +* Allows for some pre-rename operations similar to the S3A "Partitioned Staging + committer". This can be configured to delete all existing entries in + directories scheduled to be created -or fail if those partitions are + non-empty. + See [Partitioned Staging Committer](../../hadoop-aws/tools/hadoop-aws/committers.html#The_.E2.80.9CPartitioned.E2.80.9D_Staging_Committer) +* Allows for an optional preflight validation check (verify no duplicate files created by different tasks) +* Manifests can be viewed, size of output determined, etc, during + development/debugging. + +### Disadvantages + +* Needs a new manifest file format. +* May makes task commit more complex. + +This solution is necessary for GCS and should be beneficial on ABFS as listing +overheads are paid for in the task committers. + +# Implementation Details + +### Constraints + +A key goal is to keep the manifest committer isolated and neither +touch the existing committer code nor other parts of the hadoop codebase. + +It must plug directly into MR and Spark without needing any changes +other than already implemented for the S3A Committers + +* Self-contained: MUST NOT require changes to hadoop-common, etc. +* Isolated: MUST NOT make changes to existing committers +* Integrated: MUST bind via `PathOutputCommitterFactory`. + +As a result of this there's a bit of copy and paste from elsewhere, +e.g. `org.apache.hadoop.util.functional.TaskPool` +is based on S3ACommitter's `org.apache.hadoop.fs.s3a.commit.Tasks`. + +The` _SUCCESS` file MUST be compatible with the S3A JSON file. +This is to ensure any existing test suites which validate +S3A committer output can be retargeted at jobs executed +by the manifest committer without any changes. + + +#### Progress callbacks in job commit. + +When? Proposed: heartbeat until renaming finally finishes. + +#### Error handling and aborting in job commit. + +We would want to stop the entire job commit. Some atomic boolean "abort job" +would need to be checked in the processing of each task committer thread's +iteraton through a directory (or processing of each file?) +Failures in listing or renaming will need to be escalated to halting the entire +job commit. This implies that any IOE raised in asynchronous rename operation or +in a task committer thread must: + +1. be caught +1. be stored in a shared field/variable +1. trigger the abort +1. be rethrown at the end of the `commitJob()` call + +#### Avoiding deadlocks + +If a job commit stage is using a thread pool for per-task operations, e.g. loading +files, that same thread pool MUST NOT be used for parallel operations within +the per-task stage. + +As every `JobStage` is executed in sequence within task or job commit, it +is safe to share the same thread pool across stages. + +In the current implementation, there is no parallel "per manifest" operation +in job commit other than for actually loading the files. +The operations to create directories and to rename files are actually +performed without performing parallel processing of individual manifests. + +Directory Preparation: merge the directory lists of all manifests, +then queue for creation the (hopefully very much smaller) set of unique +directories. + +Rname: iterate through all manifests and queue their renames into a pool for Review comment: typo: Rename ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.time.Duration; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter; + +/** + * Factory for Rate Limiting. + * This should be only place in the code where the guava RateLimiter is imported. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class RateLimitingFactory { + + private static final RateLimiting UNLIMITED = new NoRateLimiting(); + + /** + * No waiting took place. + */ + private static final Duration INSTANTLY = Duration.ofMillis(0); + + private RateLimitingFactory() { + } + + /** + * No Rate Limiting. + */ + private static class NoRateLimiting implements RateLimiting { + + + @Override + public Duration acquire(int capacity) { + return INSTANTLY; + } + } + + /** + * Rate limiting restricted to that of a google rate limiter. + */ + private static final class RestrictedRateLimiting implements RateLimiting { + private final RateLimiter limiter; + + /** + * Constructor. + * @param capacity capacity in permits/second. + */ + private RestrictedRateLimiting(int capacity) { Review comment: Should we change the name to maxCapacity? ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Minimal subset of google rate limiter class. + * Can be used to throttle use of object stores where excess load + * will trigger cluster-wide throttling, backoff etc and so collapse + * performance. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface RateLimiting { Review comment: I think we need to experiment/test more with test before moving for all operations in FS itself. ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import java.io.IOException; +import java.time.Duration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem; + +/** + * Extension of StoreOperationsThroughFileSystem with ABFS awareness. + * Purely for use by jobs committing work through the manifest committer. + * The {@link AzureManifestCommitterFactory} will configure the Review comment: typo: will configure twice. ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import javax.annotation.Nullable; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; + +/** + * API exclusively for committing files. + * + * This is only for use by (@link {@link AbfsManifestStoreOperations}, + * and is intended to be implemented by ABFS. + * To ensure that there is no need to add mapreduce JARs to the + * classpath just to work with ABFS, this interface + * MUST NOT refer to anything in the + * {@code org.apache.hadoop.mapreduce} package. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface ResilientCommitByRename extends IOStatisticsSource { Review comment: Should we move this to hadoop-common such that other stores can also implement? ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java ########## @@ -438,24 +459,88 @@ public boolean rename(final Path src, final Path dst) throws IOException { qualifiedDstPath = makeQualified(adjustedDst); - abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext); + abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null); return true; - } catch(AzureBlobFileSystemException ex) { + } catch (AzureBlobFileSystemException ex) { LOG.debug("Rename operation failed. ", ex); checkException( - src, - ex, - AzureServiceErrorCode.PATH_ALREADY_EXISTS, - AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, - AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, - AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, - AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, - AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); + src, + ex, + AzureServiceErrorCode.PATH_ALREADY_EXISTS, + AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, + AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, + AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, + AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, + AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); return false; } } + /** + * Private method to create resilient commit support. + * @return a new instance + * @param path destination path + * @throws IOException problem probing store capabilities + * @throws UnsupportedOperationException if the store lacks this support + */ + @InterfaceAudience.Private + public ResilientCommitByRename createResilientCommitSupport(final Path path) + throws IOException { + + if (!hasPathCapability(path, + CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME)) { + throw new UnsupportedOperationException( + "Resilient commit support not available for " + path); + } + return new ResilientCommitByRenameImpl(); + } + + /** + * Resilient commit support. + * Provided as a nested class to avoid contaminating the + * FS instance with too many private methods which end up + * being used widely (as has happened to the S3A FS) + */ + public class ResilientCommitByRenameImpl implements ResilientCommitByRename { + public Pair<Boolean, Duration> commitSingleFileByRename( Review comment: nit: newline ########## File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java ########## @@ -438,24 +459,88 @@ public boolean rename(final Path src, final Path dst) throws IOException { qualifiedDstPath = makeQualified(adjustedDst); - abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext); + abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null); return true; - } catch(AzureBlobFileSystemException ex) { + } catch (AzureBlobFileSystemException ex) { LOG.debug("Rename operation failed. ", ex); checkException( - src, - ex, - AzureServiceErrorCode.PATH_ALREADY_EXISTS, - AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, - AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, - AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, - AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, - AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); + src, + ex, + AzureServiceErrorCode.PATH_ALREADY_EXISTS, + AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, + AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, + AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, + AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, + AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); return false; } } + /** + * Private method to create resilient commit support. + * @return a new instance + * @param path destination path + * @throws IOException problem probing store capabilities + * @throws UnsupportedOperationException if the store lacks this support + */ + @InterfaceAudience.Private + public ResilientCommitByRename createResilientCommitSupport(final Path path) + throws IOException { + + if (!hasPathCapability(path, + CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME)) { + throw new UnsupportedOperationException( + "Resilient commit support not available for " + path); + } + return new ResilientCommitByRenameImpl(); + } + + /** + * Resilient commit support. + * Provided as a nested class to avoid contaminating the + * FS instance with too many private methods which end up + * being used widely (as has happened to the S3A FS) + */ + public class ResilientCommitByRenameImpl implements ResilientCommitByRename { + public Pair<Boolean, Duration> commitSingleFileByRename( + final Path source, + final Path dest, + @Nullable final String sourceEtag) throws IOException { + + LOG.debug("renameFileWithEtag source: {} dest: {} etag {}", source, dest, sourceEtag); + statIncrement(CALL_RENAME); + + trailingPeriodCheck(dest); + Path qualifiedSrcPath = makeQualified(source); + Path qualifiedDstPath = makeQualified(dest); + + TracingContext tracingContext = new TracingContext(clientCorrelationId, + fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat, + listener); + + if (qualifiedSrcPath.equals(qualifiedDstPath)) { + // rename to itself is forbidden + throw new PathIOException(qualifiedSrcPath.toString(), "cannot rename object onto self"); + } + + // acquire one IO permit + final Duration waitTime = rateLimiting.acquire(1); Review comment: So this ratelimiting is only used in rename not others? ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.time.Duration; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter; + +/** + * Factory for Rate Limiting. + * This should be only place in the code where the guava RateLimiter is imported. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class RateLimitingFactory { + + private static final RateLimiting UNLIMITED = new NoRateLimiting(); + + /** + * No waiting took place. + */ + private static final Duration INSTANTLY = Duration.ofMillis(0); + + private RateLimitingFactory() { + } + + /** + * No Rate Limiting. + */ + private static class NoRateLimiting implements RateLimiting { + + + @Override + public Duration acquire(int capacity) { + return INSTANTLY; + } + } + + /** + * Rate limiting restricted to that of a google rate limiter. + */ + private static final class RestrictedRateLimiting implements RateLimiting { + private final RateLimiter limiter; + + /** + * Constructor. + * @param capacity capacity in permits/second. + */ + private RestrictedRateLimiting(int capacity) { + this.limiter = RateLimiter.create(capacity); + } + + @Override + public Duration acquire(int capacity) { Review comment: and this requestedCapacity for better understanding? ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.time.Duration; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Minimal subset of google rate limiter class. + * Can be used to throttle use of object stores where excess load + * will trigger cluster-wide throttling, backoff etc. and so collapse + * performance. + * The time waited is returned as a Duration type. + * The google rate limiter implements this by allowing a caller to ask for + * more capacity than is available. This will be granted + * but the subsequent request will be blocked if the bucket of + * capacity hasn't let refilled to the point where there is + * capacity again. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface RateLimiting { + + /** + * Acquire rate limiter capacity. + * If there is not enough space, the permits will be acquired, + * but the subsequent call will block until the capacity has been + * refilled. + * @param capacity capacity to acquire. + * @return time spent waiting for output. + */ + Duration acquire(int capacity); Review comment: Are there no operation to release? How will capacity be freed once IO is done? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 741527) Time Spent: 33h 40m (was: 33.5h) > Add a task-manifest output committer for Azure and GCS > ------------------------------------------------------ > > Key: MAPREDUCE-7341 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7341 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: client > Affects Versions: 3.3.1 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > Time Spent: 33h 40m > Remaining Estimate: 0h > > Add a task-manifest output committer for Azure and GCS > The S3A committers are very popular in Spark on S3, as they are both correct > and fast. > The classic FileOutputCommitter v1 and v2 algorithms are all that is > available for Azure ABFS and Google GCS, and they have limitations. > The v2 algorithm isn't safe in the presence of failed task attempt commits, > so we > recommend the v1 algorithm for Azure. But that is slow because it > sequentially lists > then renames files and directories, one-by-one. The latencies of list > and rename make things slow. > Google GCS lacks the atomic directory rename required for v1 correctness; > v2 can be used (which doesn't have the job commit performance limitations), > but it's not safe. > Proposed > * Add a new FileOutputFormat committer which uses an intermediate manifest to > pass the list of files created by a TA to the job committer. > * Job committer to parallelise reading these task manifests and submit all the > rename operations into a pool of worker threads. (also: mkdir, directory > deletions on cleanup) > * Use the committer plugin mechanism added for s3a to make this the default > committer for ABFS > (i.e. no need to make any changes to FileOutputCommitter) > * Add lots of IOStatistics instrumentation + logging of operations in the > JobCommit > for visibility of where delays are occurring. > * Reuse the S3A committer _SUCCESS JSON structure to publish IOStats & other > data > for testing/support. > This committer will be faster than the V1 algorithm because of the > parallelisation, and > because a manifest written by create-and-rename will be exclusive to a single > task > attempt, delivers the isolation which the v2 committer lacks. > This is not an attempt to do an iceberg/hudi/delta-lake style manifest-only > format > for describing the contents of a table; the final output is still a directory > tree > which must be scanned during query planning. > As such the format is still suboptimal for cloud storage -but at least we > will have > faster job execution during the commit phases. > > Note: this will also work on HDFS, where again, it should be faster than > the v1 committer. However the target is very much Spark with ABFS and GCS; no > plans to worry about MR as that simplifies the challenge of dealing with job > restart (i.e. you don't have to) -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org