[ https://issues.apache.org/jira/browse/MAPREDUCE-7341?focusedWorklogId=739519&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-739519 ]
ASF GitHub Bot logged work on MAPREDUCE-7341: --------------------------------------------- Author: ASF GitHub Bot Created on: 10/Mar/22 15:00 Start Date: 10/Mar/22 15:00 Worklog Time Spent: 10m Work Description: attilapiros commented on a change in pull request #2971: URL: https://github.com/apache/hadoop/pull/2971#discussion_r823751218 ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md ########## @@ -0,0 +1,335 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + + +# Manifest Committer Architecture + +This document describes the architecture and other implementation/correctness +aspects of the [Manifest Committer](manifest_committer.html) + +The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html). +<!-- MACRO{toc|fromDepth=0|toDepth=2} --> + +The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world" +queries, and performance and correctness on GCS. + +This committer uses the extension point which came in for the S3A committers. +Users can declare a new committer factory for `abfs://` and `gcs://` URLs. +It can be used through Hadoop MapReduce and Apache Spark. + +## Background + +### Terminology + +| Term | Meaning| +|------|--------| +| Committer | A class which can be invoked by MR Spark to perform the task and job commit operations. | Review comment: ```suggestion | Committer | A class which can be invoked by MR/Spark to perform the task and job commit operations. | ``` ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md ########## @@ -0,0 +1,567 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + + +# The Manifest Committer for Azure and Google Cloud Storage + +This document how to use the _Manifest Committer_. + +The _Manifest_ committer is a committer for work which provides +performance on ABFS for "real world" queries, +and performance and correctness on GCS. + +The architecture and implementation of the committer is covered in +[Manifest Committer Architecture](manifest_committer_architecture.html). + + +The protocol and its correctness are covered in +[Manifest Committer Protocol](manifest_committer_protocol.html). + +<!-- MACRO{toc|fromDepth=0|toDepth=2} --> + +## Problem: + +The only committer of work from Spark to Azure ADLS Gen 2 "abfs://" storage +which is safe to use is the "v1 file committer". + +This is "correct" in that if a task attempt fails, its output is guaranteed not +to be included in the final out. The "v2" commit algorithm cannot meet that +guarantee, which is why it is no longer the default. + +But: it is slow, especially on jobs where deep directory trees of output are used. +Why is it slow? It's hard to point at a particular cause, primarily because of +the lack of any instrumentation in the `FileOutputCommitter`. +Stack traces of running jobs generally show `rename()`, though list operations +do surface too. + +On Google GCS, neither the v1 nor v2 algorithm are _safe_ because the google +filesystem doesn't have the atomic directory rename which the v1 algorithm +requires. + +A further issue is that both Azure and GCS storage may encounter scale issues +with deleting directories with many descendants. +This can trigger timeouts because the FileOutputCommitter assumes that +cleaning up after the job is a fast call to `delete("_temporary", true)`. + +## Solution. + +The _Intermediate Manifest_ committer is a new committer for +work which should deliver performance on ABFS +for "real world" queries, and performance and correctness on GCS. + +This committer uses the extension point which came in for the S3A committers. +Users can declare a new committer factory for abfs:// and gcs:// URLs. +A suitably configured spark deployment will pick up the new committer. + +Directory performance issues in job cleanup can be addressed by two options +1. The committer can be configured to move the temporary directory under `~/.trash` +in the cluster FS. This may benefit azure, but will not benefit GCS. +1. The committer will parallelize deletion of task attempt directories before + deleting the `_temporary` directory. + This is highly beneficial on GCS; may or may not be beneficial on ABFS. + (More specifically: use it if you are using OAuth to authenticate). + +Suitably configured MR and Spark deployments will pick up the new committer. + +The committer can be used with any filesystem client which has a "real" file rename where +only one process may rename a file, and if it exists, then the caller is notified. + + + +# <a name="how"></a> How it works + +The full details are covered in [Manifest Committer Architecture](manifest_committer_architecture.html). + +# <a name="use"></a> Using the committer + +The hooks put in to support the S3A committers were designed to allow every +filesystem schema to provide their own committer. +See [Switching To an S3A Committer](../../hadoop-aws/tools/hadoop-aws/committers.html#Switching_to_an_S3A_Committer) + +A factory for the abfs schema would be defined in +`mapreduce.outputcommitter.factory.scheme.abfs` ; and a similar one for `gcs`. + +Some matching spark configuration changes, especially for parquet binding, will be required. +These can be done in `core-site.xml`, if it is not defined in the `mapred-default.xml` JAR. + + +```xml +<property> + <name>mapreduce.outputcommitter.factory.scheme.abfs</name> + <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value> +</property> +<property> + <name>mapreduce.outputcommitter.factory.scheme.gs</name> + <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value> +</property> +``` + +## Binding to the manifest committer in Spark. + +In Apache Spark, the `spark-default` configuration needs to switch to using the committer factory +mechanism to instantiate committers. This includes configuring Parquet with a subclass of the parquet +committer which uses the factory mechansim internally. + +``` +spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory +spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory +spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter +spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol +``` + + +### <a name="committerinfo"></a> Using the Cloudstore `committerinfo` command to probe committer bindings. + +The hadoop committer settings can be validated in a recent build of [cloudstore](https://github.com/steveloughran/cloudstore) +and its `committerinfo` command. +This command instantiates a committer for that path through the same factory mechanism as MR and spark jobs use, +then prints its `toString` value. + +``` +hadoop jar cloudstore-1.0.jar committerinfo abfs://test...@ukwest.dfs.core.windows.net/ + +2021-09-16 19:42:59,731 [main] INFO commands.CommitterInfo (StoreDurationInfo.java:<init>(53)) - Starting: Create committer +Committer factory for path abfs://test...@ukwest.dfs.core.windows.net/ is + org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory@3315d2d7 + (classname org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory) +2021-09-16 19:43:00,897 [main] INFO manifest.ManifestCommitter (ManifestCommitter.java:<init>(144)) - Created ManifestCommitter with + JobID job__0000, Task Attempt attempt__0000_r_000000_1 and destination abfs://test...@ukwest.dfs.core.windows.net/ +Created committer of class org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter: + ManifestCommitter{ManifestCommitterConfig{destinationDir=abfs://test...@ukwest.dfs.core.windows.net/, + role='task committer', + taskAttemptDir=abfs://test...@ukwest.dfs.core.windows.net/_temporary/manifest_job__0000/0/_temporary/attempt__0000_r_000000_1, + createJobMarker=true, + jobUniqueId='job__0000', + jobUniqueIdSource='JobID', + jobAttemptNumber=0, + jobAttemptId='job__0000_0', + taskId='task__0000_r_000000', + taskAttemptId='attempt__0000_r_000000_1'}, + iostatistics=counters=(); + +gauges=(); + +minimums=(); + +maximums=(); + +means=(); +} + +``` + + +## Verifying that the committer was used + +The new committer will write a JSON summary of the operation, including statistics, in the `_SUCCESS` file. + +If this file exists and is zero bytes long: the classic `FileOutputCommitter` was used. + +If this file exists and is greater than zero bytes wrong, either the manifest committer was used, Review comment: ```suggestion If this file exists and is greater than zero bytes long, either the manifest committer was used, ``` ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_protocol.md ########## @@ -0,0 +1,619 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + + +# Manifest Committer Protocol + +This document describes the commit protocol + of the [Manifest Committer](manifest_committer.html) + +<!-- MACRO{toc|fromDepth=0|toDepth=2} --> + +## Background + +### Terminology + +| Term | Meaning| +|------|--------| +| Committer | A class which can be invoked by MR Spark to perform the task and job commit operations. | +| Spark Driver | The spark process scheduling the work and choreographing the commit operation.| +| Job: in MapReduce | The entire application. In spark, this is a single stage in a chain of work | +| Job Attempt | A single attempt at a job. MR supports multiple Job attempts with recovery on partial job failure. Spark says "start again from scratch" | +| Task | a subsection of a job, such as processing one file, or one part of a file | +| Task ID | ID of the task, unique within this job. Usually starts at 0 and is used in filenames (part-0000, part-001, etc.) | +| Task attempt (TA) | An attempt to perform a task. It may fail, in which case MR/spark will schedule another. | +| Task Attempt ID | A unique ID for the task attempt. The Task ID + an attempt counter.| +| Destination directory | The final destination of work.| +| Job Attempt Directory | A temporary directory used by the job attempt. This is always _underneath_ the destination directory, so as to ensure it is in the same encryption zone as HDFS, storage volume in other filesystems, etc.| +| Task Attempt directory | Directory under the Job Attempt Directory where task attempts create subdiretories for their own work | +| Task Attempt Working Directory| Directory exclusive for each task attempt under which files are written | +| Task Commit | Taking the output of a Task Attempt and making it the final/exclusive result of that "successful" Task. | +| Job Commit | aggregating all the outputs of all committed tasks and producing the final results of the job. | + + + +The purpose of a committer is to ensure that the complete output of +a job ends up in the destination, even in the presence of failures of tasks. + +- _Complete:_ the output includes the work of all successful tasks. +- _Exclusive:_ the output of unsuccessful tasks is not present. +- _Concurrent:_ When multiple tasks are committed in parallel the output is the same as when + the task commits are serialized. This is not a requirement of Job Commit. +- _Abortable:_ jobs and tasks may be aborted prior to job commit, after which their output is not visible. +- _Continuity of correctness:_ once a job is committed, the output of any failed, + aborted, or unsuccessful task MUST NO appear at some point in the future. + +For Hive's classic hierarchical-directory-structured tables, job committing +requires the output of all committed tasks to be put into the correct location +in the directory tree. + +The committer built into `hadoop-mapreduce-client-core` module is the `FileOutputCommitter`. + + +It has two algorithms, v1 and v2. + +The v1 algorithm is resilient to all forms of task failure, but slow +when committing the final aggregate output as it renames each newly created file +to the correct place in the table one by one. + +The v2 algorithm is not considered safe because the output is visible when individual +tasks commit, rather than being delayed until job commit. +It is possible for multiple task attempts to get their data into the output +directory tree, and if a job fails/is aborted before the job is committed, +thie output is visible. + +## File Output Committer V1 and V2 + +### File Output Committer V1 and V2 Commit algorithms + +#### Task attempt execution (V1 and V2) + +job attempt directory in `$dest/__temporary/$jobAttemptId/` contains all output +of the job in progress every task attempt is allocated its own task attempt dir +`$dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId` + +All work for a task is written under the task attempt directory. If the output +is a deep tree with files at the root, the task attempt dir will end up with a +similar structure, with the files it has generated and the directories above +them. + +### MapReduce V1 algorithm: + +#### v1 Task commit + +The task attempt dir is renamed directly underneath the job attempt dir + +``` +rename( + $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId + $dest/__temporary/$jobAttemptId/$taskId) +``` + +#### V1 Job Commit + +For each committed task, all files underneath are renamed into the destination +directory, with a filename relative from the base directory of the task remapped +to that of the dest dir. + +That is, everything under `$dest/__temporary/$jobAttemptId/$taskId` is converted +to a path under `$dest`. + +A recursive treewalk identifies the paths to rename in each TA directory. +There's some optimisation if the task directory tree contains a subdirectory +directory which does not exist under the destination: in this case the whole +directory can be renamed. If the directory already exists, a file-by-file merge +takes place for that dir, with the action for subdirectories again depending on +the presence of the destination. + +As a result, if the output of each task goes a separate final directory (e.g the +final partition is unique to a single task), the rename is O(1) for the dir, +irrespective of children. If the output is to be in the same dir as other +tasks (or updating existing directories), then the rename performance becomes O( +files). + +Finally, a 0-byte `_SUCCESS` file is written iff `mapreduce.fileoutputcommitter.marksuccessfuljobs` is true. + +### MapReduce V2 algorithm: + +#### V2 Task commit + +The files under the task attempt dir are renamed one by one into the destination +directory. There's no attempt at optimising directory renaming, because other +tasks may be committing their work at the same time. It is therefore `O(files)` + +the cost of listing the directory tree. Again: done with a recursive treewalk, +not a deep `listFiles(path, recursive=true)` API, which would be faster on HDFS +and (though not relevant here) S3. + +#### V2 Job Commit + +A 0-byte `_SUCCESS` file is written iff `mapreduce.fileoutputcommitter.marksuccessfuljobs` +is true. + + + +### Why the V2 committer is incorrect/unsafe + +If, for a Task T1, Task Attempt 1 (T1A1) fails before committing, the driver +will schedule a new attempt "T1A2", and commit it. All is good. + +But: if T1A1 was given permission to commit and it failed during the commit +process, some of its output may have been written to the destination directory. + +If attempt T1A2 was then told to commit, then if and only if its output had the +exact set of file names would any already-renamed files be overwritten. If +different filenames were generated, then the output would contain files of T1A1 +and T1A2. + +If T1A1 became partitioned during the commit process, then the job committer +would schedule another attempt and commit its work. However, if T1A1 still had +connectivity to the filesystem, it could still be renaming files. The output of +the two tasks could be intermingled even if the same filenames were used. + +## Background: the S3A Committers + +The paper, [_A Zero-Rename Committer_](https://github.com/steveloughran/zero-rename-committer/releases/), +Loughran et. al., covers these committers + +It also describes the commit problem, defines correctness, and describes the +algorithms of the v1 and v2 committers, as well as those of the S3A committers, +IBM Stocator committer and what we know of EMR's Spark committer. + +The `hadoop-aws` JAR contains a pair of committers, "Staging" and "Magic". Both +of these are implementations of the same problem: safely and rapidly committing +work to an S3 object store. + +The committers take advantage of the fact that S3 offers an atomic way to create +a file: the PUT request. + +Files either exist or they don't. A file can be uploaded direct to its +destination, and it is only when the upload completes that the file is manifest +-overwriting any existing copy. + +For large files, a multipart upload allows this upload operation to be split +into a series of POST requests + +1 `initiate-upload (path -> upload ID)` +1. `upload part(path, upload ID, data[]) -> checksum.` + This can be parallelised. Up to 10,000 parts can be uploaded to a single + object. All but the final part must be >= 5MB. +1. `complete-upload (path, upload ID, List<checksum>)` + this manifests the file, building it from the parts in the sequence of blocks + defined by the ordering of the checksums. + +The secret for the S3A committers is that the final POST request can be delayed +until the job commit phase, even though the files are uploaded during task +attempt execution/commit. The task attempts need to determine the final +destination of each file, upload the data as part of a multipart operation, then +save the information needed to complete the upload in a file which is later read +by the job committer and used in a POST request. + +### Staging Committer + +The _Staging Committer_ is based on the contribution by Ryan Blue of Netflix. +it relies on HDFS to be the consistent store to propagate the `.pendingset` files. + +The working directory of each task attempt is in the local filesystem, "the +staging directory". The information needed to complete the uploads is passed +from Task Attempts to the Job Committer by using a v1 FileOutputCommitter +working with the cluster HDFS filesystem. This ensures that the committer has +the same correctness guarantees as the v1 algorithm. + +1. Task commit consists of uploading all files under the local filesystem's task + attempt working directory to their final destination path, holding back on + the final manifestation POST. +1. A JSON file containing all information needed to complete the upload of all + files in the task attempt is written to the Job Attempt directory of of the + wrapped committer working with HDFS. +1. Job commit: load in all the manifest files in the HDFS job attempt directory, + then issued the POST request to complete the uploads. These are parallelised. + + +### The Magic Committer + +The _Magic Committer_ is purely-S3A and takes advantage and of +the fact the authorts could make changes within the file system client itself. + +"Magic" paths are defined which, when opened for writing under, initiate a +multi-party upload to the final destination directory. When the output stream is +`close()`d, a zero byte marker file is written to the magic path, and a JSON +.pending file containing all the information needed to complete the upload is +saved. + +Task commit: +1. List all `.pending` files under each task attempt's magic directory; +1. Aggregate to a `.pendingset` file +1. Save to the job attempt directory with the task ID. + +Job commit: + +1. List `.pendingset` files in the job attempt directory +1. Complete the uploads with POST requests. + +The Magic committer absolutely requires a consistent S3 Store -originally with +S3Guard. Now that S3 is consistent, raw S3 can be used. It does not need HDFS +or any other filesystem with `rename()`. + +### Correctness + +The S3A committer is considered correct because + +1. Nothing is materialized until job commit. +1. Only one task attempt's manifest can be saved to the job attempt directory. + Hence: only of the TA's files of the same task ID are exclusively committed. +1. The staging committer's use of HDFS to pass manifests from TAs to the Job + committer ensures that S3's eventual consistency would not cause manifests to + be missed. +1. Until S3 was consistent, the magic committer relied on S3Guard to provide the + list consistency needed during both task- and job- commit. +1. The authors and wider community fixed all the issues related to the committers + which have surfaced in production. + +Significant issues which were fixed include: + +* [HADOOP-15961](https://issues.apache.org/jira/browse/HADOOP-15961). + S3A committers: make sure there's regular progress() calls. +* [HADOOP-16570](https://issues.apache.org/jira/browse/HADOOP-16570). + S3A committers encounter scale issues. +* [HADOOP-16798](https://issues.apache.org/jira/browse/HADOOP-16798). + S3A Committer thread pool shutdown problems. +* [HADOOP-17112](https://issues.apache.org/jira/browse/HADOOP-17112). + S3A committers can't handle whitespace in paths. +* [HADOOP-17318](https://issues.apache.org/jira/browse/HADOOP-17318). + Support concurrent S3A commit jobs with same app attempt ID. +* [HADOOP-17258](https://issues.apache.org/jira/browse/HADOOP-17258). + MagicS3GuardCommitter fails with `pendingset` already exists +* [HADOOP-17414](https://issues.apache.org/jira/browse/HADOOP-17414]). + Magic committer files don't have the count of bytes written collected by spark +* [SPARK-33230](https://issues.apache.org/jira/browse/SPARK-33230) + Hadoop committers to get unique job ID in `spark.sql.sources.writeJobUUID` +* [SPARK-33402](https://issues.apache.org/jira/browse/SPARK-33402) + Jobs launched in same second have duplicate MapReduce JobIDs +* [SPARK-33739](https://issues.apache.org/jira/browse/SPARK-33739]). + Jobs committed through the S3A Magic committer don't report + the bytes written (depends on HADOOP-17414) + +Of those which affected the correctness rather than scale/performance/UX: +HADOOP-17258 involved the recovery from a failure after TA1 task commit had +completed —but had failed to report in. SPARK-33402, SPARK-33230 and +HADOOP-17318 are all related: if two spark jobs/stages started in the +same second, they had the same job ID. This caused the HDFS directories used by +the staging committers to be intermingled. + +What is notable is this: these are all problems which the minimal integration +test suites did not or discover. Review comment: Missing word after not or simply: ```suggestion test suites did not discover. ``` ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md ########## @@ -0,0 +1,335 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + + +# Manifest Committer Architecture + +This document describes the architecture and other implementation/correctness +aspects of the [Manifest Committer](manifest_committer.html) + +The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html). +<!-- MACRO{toc|fromDepth=0|toDepth=2} --> + +The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world" +queries, and performance and correctness on GCS. + +This committer uses the extension point which came in for the S3A committers. +Users can declare a new committer factory for `abfs://` and `gcs://` URLs. +It can be used through Hadoop MapReduce and Apache Spark. + +## Background + +### Terminology + +| Term | Meaning| +|------|--------| +| Committer | A class which can be invoked by MR Spark to perform the task and job commit operations. | +| Spark Driver | The spark process scheduling the work and choreographing the commit operation.| +| Job: in MapReduce | The entire application. In spark, this is a single stage in a chain of work | Review comment: ```suggestion | Job | In MapReduce. the entire application. In spark, this is a single stage in a chain of work | ``` ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md ########## @@ -0,0 +1,567 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + + +# The Manifest Committer for Azure and Google Cloud Storage + +This document how to use the _Manifest Committer_. + +The _Manifest_ committer is a committer for work which provides +performance on ABFS for "real world" queries, +and performance and correctness on GCS. + +The architecture and implementation of the committer is covered in +[Manifest Committer Architecture](manifest_committer_architecture.html). + + +The protocol and its correctness are covered in +[Manifest Committer Protocol](manifest_committer_protocol.html). + +<!-- MACRO{toc|fromDepth=0|toDepth=2} --> + +## Problem: + +The only committer of work from Spark to Azure ADLS Gen 2 "abfs://" storage +which is safe to use is the "v1 file committer". + +This is "correct" in that if a task attempt fails, its output is guaranteed not +to be included in the final out. The "v2" commit algorithm cannot meet that +guarantee, which is why it is no longer the default. + +But: it is slow, especially on jobs where deep directory trees of output are used. +Why is it slow? It's hard to point at a particular cause, primarily because of +the lack of any instrumentation in the `FileOutputCommitter`. +Stack traces of running jobs generally show `rename()`, though list operations +do surface too. + +On Google GCS, neither the v1 nor v2 algorithm are _safe_ because the google +filesystem doesn't have the atomic directory rename which the v1 algorithm +requires. + +A further issue is that both Azure and GCS storage may encounter scale issues +with deleting directories with many descendants. +This can trigger timeouts because the FileOutputCommitter assumes that +cleaning up after the job is a fast call to `delete("_temporary", true)`. + +## Solution. + +The _Intermediate Manifest_ committer is a new committer for +work which should deliver performance on ABFS +for "real world" queries, and performance and correctness on GCS. + +This committer uses the extension point which came in for the S3A committers. +Users can declare a new committer factory for abfs:// and gcs:// URLs. +A suitably configured spark deployment will pick up the new committer. + +Directory performance issues in job cleanup can be addressed by two options +1. The committer can be configured to move the temporary directory under `~/.trash` +in the cluster FS. This may benefit azure, but will not benefit GCS. +1. The committer will parallelize deletion of task attempt directories before + deleting the `_temporary` directory. + This is highly beneficial on GCS; may or may not be beneficial on ABFS. + (More specifically: use it if you are using OAuth to authenticate). + +Suitably configured MR and Spark deployments will pick up the new committer. + +The committer can be used with any filesystem client which has a "real" file rename where +only one process may rename a file, and if it exists, then the caller is notified. + + + +# <a name="how"></a> How it works + +The full details are covered in [Manifest Committer Architecture](manifest_committer_architecture.html). + +# <a name="use"></a> Using the committer + +The hooks put in to support the S3A committers were designed to allow every +filesystem schema to provide their own committer. +See [Switching To an S3A Committer](../../hadoop-aws/tools/hadoop-aws/committers.html#Switching_to_an_S3A_Committer) + +A factory for the abfs schema would be defined in +`mapreduce.outputcommitter.factory.scheme.abfs` ; and a similar one for `gcs`. + +Some matching spark configuration changes, especially for parquet binding, will be required. +These can be done in `core-site.xml`, if it is not defined in the `mapred-default.xml` JAR. + + +```xml +<property> + <name>mapreduce.outputcommitter.factory.scheme.abfs</name> + <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value> +</property> +<property> + <name>mapreduce.outputcommitter.factory.scheme.gs</name> + <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value> +</property> +``` + +## Binding to the manifest committer in Spark. + +In Apache Spark, the `spark-default` configuration needs to switch to using the committer factory +mechanism to instantiate committers. This includes configuring Parquet with a subclass of the parquet Review comment: ```suggestion In Apache Spark, the configuration can be done either with command line options (after the '--conf') or by using the `spark-defaults.conf` file. The following is an example of using `spark-defaults.conf` also including the configuration for Parquet with a subclass of the parquet ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 739519) Time Spent: 28h 10m (was: 28h) > Add a task-manifest output committer for Azure and GCS > ------------------------------------------------------ > > Key: MAPREDUCE-7341 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7341 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: client > Affects Versions: 3.3.1 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > Time Spent: 28h 10m > Remaining Estimate: 0h > > Add a task-manifest output committer for Azure and GCS > The S3A committers are very popular in Spark on S3, as they are both correct > and fast. > The classic FileOutputCommitter v1 and v2 algorithms are all that is > available for Azure ABFS and Google GCS, and they have limitations. > The v2 algorithm isn't safe in the presence of failed task attempt commits, > so we > recommend the v1 algorithm for Azure. But that is slow because it > sequentially lists > then renames files and directories, one-by-one. The latencies of list > and rename make things slow. > Google GCS lacks the atomic directory rename required for v1 correctness; > v2 can be used (which doesn't have the job commit performance limitations), > but it's not safe. > Proposed > * Add a new FileOutputFormat committer which uses an intermediate manifest to > pass the list of files created by a TA to the job committer. > * Job committer to parallelise reading these task manifests and submit all the > rename operations into a pool of worker threads. (also: mkdir, directory > deletions on cleanup) > * Use the committer plugin mechanism added for s3a to make this the default > committer for ABFS > (i.e. no need to make any changes to FileOutputCommitter) > * Add lots of IOStatistics instrumentation + logging of operations in the > JobCommit > for visibility of where delays are occurring. > * Reuse the S3A committer _SUCCESS JSON structure to publish IOStats & other > data > for testing/support. > This committer will be faster than the V1 algorithm because of the > parallelisation, and > because a manifest written by create-and-rename will be exclusive to a single > task > attempt, delivers the isolation which the v2 committer lacks. > This is not an attempt to do an iceberg/hudi/delta-lake style manifest-only > format > for describing the contents of a table; the final output is still a directory > tree > which must be scanned during query planning. > As such the format is still suboptimal for cloud storage -but at least we > will have > faster job execution during the commit phases. > > Note: this will also work on HDFS, where again, it should be faster than > the v1 committer. However the target is very much Spark with ABFS and GCS; no > plans to worry about MR as that simplifies the challenge of dealing with job > restart (i.e. you don't have to) -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org