[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7341?focusedWorklogId=739797&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-739797
 ]

ASF GitHub Bot logged work on MAPREDUCE-7341:
---------------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Mar/22 01:08
            Start Date: 11/Mar/22 01:08
    Worklog Time Spent: 10m 
      Work Description: sidseth commented on a change in pull request #2971:
URL: https://github.com/apache/hadoop/pull/2971#discussion_r824277335



##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.Trash;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
+/**
+ * Implementation of manifest store operations through the filesystem API.
+ * {@link #moveToTrash(String, Path)} uses the normal Trash classes.
+ * resilient commit is not available.
+ * This class is subclassed in the ABFS module, which does add the resilient
+ * commit method.
+ */
+@InterfaceAudience.LimitedPrivate("mapreduce, object-stores")
+@InterfaceStability.Unstable
+public class ManifestStoreOperationsThroughFileSystem extends 
ManifestStoreOperations {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ManifestStoreOperationsThroughFileSystem.class);
+
+  /**
+   * Trash.moveToTrash() returned false.
+   */
+  public static final String E_TRASH_FALSE = "Failed to rename to trash" +
+      " -check trash interval in " + FS_TRASH_INTERVAL_KEY +": ";
+
+  /**
+   * Filesystem; set in {@link #bindToFileSystem(FileSystem, Path)}.
+   */
+  private FileSystem fileSystem;
+
+  /**
+   * Has a call to msync failed as unsupported?
+   */
+  private boolean msyncUnsupported = false;
+
+  /**
+   * Direct Constructor.
+   * @param fileSystem filesystem to write through.
+   */
+  public ManifestStoreOperationsThroughFileSystem(final FileSystem fileSystem) 
{
+    this.fileSystem = fileSystem;
+  }
+
+  /**
+   * Constructor used for introspection-based binding.
+   */
+  public ManifestStoreOperationsThroughFileSystem() {
+  }
+
+  @Override
+  public void close() throws IOException {
+    /* no-op; FS is assumed to be shared. */
+
+  }
+
+  /**
+   * Get the filesystem.
+   * @return the filesystem; null until bound.
+   */
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  @Override
+  public void bindToFileSystem(FileSystem filesystem, Path path) throws 
IOException {
+    fileSystem = filesystem;
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    return fileSystem.getFileStatus(path);
+  }
+
+  /**
+   * Using FileSystem.isFile to offer stores the option to optimize their 
probes.
+   * @param path path to probe
+   * @return true if the path resolves to a file.
+   * @throws IOException IO failure.
+   */
+  @SuppressWarnings("deprecation")
+  @Override
+  public boolean isFile(Path path) throws IOException {
+    return fileSystem.isFile(path);
+  }
+
+  @Override
+  public boolean delete(Path path, boolean recursive)
+      throws IOException {
+    return fileSystem.delete(path, recursive);
+  }
+
+  @Override
+  public boolean mkdirs(Path path)
+      throws IOException {
+    return fileSystem.mkdirs(path);
+  }
+
+  @Override
+  public boolean renameFile(Path source, Path dest)
+      throws IOException {
+    return fileSystem.rename(source, dest);
+  }
+
+  @Override
+  public RemoteIterator<FileStatus> listStatusIterator(Path path)
+      throws IOException {
+    return fileSystem.listStatusIterator(path);
+  }
+
+  @Override
+  public TaskManifest loadTaskManifest(
+      JsonSerialization<TaskManifest> serializer,
+      FileStatus st) throws IOException {
+    return TaskManifest.load(serializer, fileSystem, st.getPath(), st);
+  }
+
+  @Override
+  public <T extends AbstractManifestData<T>> void save(
+      final T manifestData,
+      final Path path,
+      final boolean overwrite) throws IOException {
+    manifestData.save(fileSystem, path, overwrite);
+  }
+
+  @Override
+  public boolean isTrashEnabled(Path path) {
+    try {
+      return fileSystem.getServerDefaults(path).getTrashInterval() > 0;

Review comment:
       Will this ever be true? ABFS at least doesn't seem to implement 
getServerDefaults / derivatives.

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Public constants for the manifest committer.
+ * This includes all configuration options and their default values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class ManifestCommitterConstants {
+
+  /**
+   * Suffix to use in manifest files in the job attempt dir.
+   * Value: {@value}.
+   */
+  public static final String MANIFEST_SUFFIX = "-manifest.json";
+
+  /**
+   * Prefix for summary files in the report dir. Call
+   */
+  public static final String SUMMARY_FILENAME_PREFIX = "summary-";
+
+  /**
+   * Format string used to build a summary file from a Job ID.
+   */
+  public static final String SUMMARY_FILENAME_FORMAT =
+      SUMMARY_FILENAME_PREFIX + "%s.json";
+
+  /**
+   * Suffix to use for temp files before renaming them.
+   * Value: {@value}.
+   */
+  public static final String TMP_SUFFIX = ".tmp";
+
+  /**
+   * Initial number of all app attempts.
+   * This is fixed in YARN; for Spark jobs the
+   * same number "0" is used.
+   */
+  public static final int INITIAL_APP_ATTEMPT_ID = 0;
+
+  /**
+   * Format string for building a job dir.
+   * Value: {@value}.
+   */
+  public static final String JOB_DIR_FORMAT_STR = "manifest_%s";
+
+  /**
+   * Format string for building a job attempt dir.
+   * This uses the job attempt number so previous versions
+   * can be found trivially.
+   * Value: {@value}.
+   */
+  public static final String JOB_ATTEMPT_DIR_FORMAT_STR = "%d";
+
+  /**
+   * Name of directory under job attempt dir for manifests.
+   */
+  public static final String JOB_TASK_MANIFEST_SUBDIR = "manifests";
+
+  /**
+   * Name of directory under job attempt dir for task attempts.
+   */
+  public static final String JOB_TASK_ATTEMPT_SUBDIR = "tasks";
+
+
+  /**
+   * Committer classname as recorded in the committer _SUCCESS file.
+   */
+  public static final String MANIFEST_COMMITTER_CLASSNAME =
+      
"org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter";
+
+  /**
+   * Marker file to create on success: {@value}.
+   */
+  public static final String SUCCESS_MARKER = "_SUCCESS";
+
+  /** Default job marker option: {@value}. */
+  public static final boolean DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER = true;
+
+  /**
+   * The limit to the number of committed objects tracked during
+   * job commits and saved to the _SUCCESS file.
+   * Value: {@value}.
+   */
+  public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
+
+  /**
+   * The UUID for jobs: {@value}.
+   * This was historically created in Spark 1.x's SQL queries,
+   * but "went away".
+   * It has been restored in recent spark releases.
+   * If found: it is used instead of the MR job attempt ID.
+   */
+  public static final String SPARK_WRITE_UUID = 
"spark.sql.sources.writeJobUUID";
+
+  /**
+   * String to use as source of the job ID.
+   * This SHOULD be kept in sync with that of
+   * {@code AbstractS3ACommitter.JobUUIDSource}.
+   * Value: {@value}.
+   */
+  public static final String JOB_ID_SOURCE_MAPREDUCE = "JobID";
+
+  /**
+   * Prefix to use for config options: {@value}.
+   */
+  public static final String OPT_PREFIX = "mapreduce.manifest.committer.";
+
+  /**
+   * Rather than delete in cleanup, should the working directory
+   * be moved to the trash directory?
+   * Potentially faster on some stores.
+   * Value: {@value}.
+   */
+  public static final String OPT_CLEANUP_MOVE_TO_TRASH =

Review comment:
       Left a comment about trash usage elsewhere, and how the config parameter 
ends up being ignored / irrelevant. Guessing I'm missing something there.

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.OPERATION_TIMED_OUT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.E_TRASH_FALSE;
+
+/**
+ * Wrap an existing {@link ManifestStoreOperations} implementation and fail on
+ * specific paths.
+ * This is for testing. It could be implemented via
+ * Mockito 2 spy code but is not so that:
+ * 1. It can be backported to Hadoop versions using Mockito 1.x.
+ * 2. It can be extended to use in production. This is why it is in
+ * the production module -to allow for downstream tests to adopt it.
+ * 3. You can actually debug what's going on.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class UnreliableManifestStoreOperations extends ManifestStoreOperations 
{

Review comment:
       Should be under 'test', instead of alongside the main code.

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * FileSystem operations which are needed to generate the task manifest.
+ * The specific choice of which implementation to use is configurable.
+ * Object store implementations MAY subclass if they
+ * need to implement resilient commit operations.
+ * However, the actual API MUST NOT be used outside
+ * the manifest committer and its tests.
+ */
+@InterfaceAudience.LimitedPrivate("mapreduce, object-stores")

Review comment:
       Apologies for repeating this all over the place ... object-stores should 
not be depending on this / implementing this in their own packages (i.e. no MR 
dependencies for people using object stores).

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java
##########
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_MOVE_TO_TRASH;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_MOVE_TO_TRASH_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_ATTEMPT_DIRS;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_ATTEMPT_DIRS_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
+
+/**
+ * Clean up a job's temporary directory through parallel delete,
+ * base _temporary delete and as a fallback, rename to trash.
+ * Returns: the outcome of the overall operation and any move to trash.
+ * The result is detailed purely for the benefit of tests, which need
+ * to make assertions about error handling and fallbacks.
+ */
+public class CleanupJobStage extends
+    AbstractJobCommitStage<
+        CleanupJobStage.Arguments,
+        CleanupJobStage.Result> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CleanupJobStage.class);
+
+  /**
+   * Count of deleted directories.
+   */
+  private final AtomicInteger deleteDirCount = new AtomicInteger();
+
+  /**
+   * Count of delete failures.
+   */
+  private final AtomicInteger deleteFailureCount = new AtomicInteger();
+
+  /**
+   * Last delete exception; non null if deleteFailureCount is not zero.
+   */
+  private IOException lastDeleteException = null;
+
+  /**
+   * Stage name as passed in from arguments.
+   */
+  private String stageName = OP_STAGE_JOB_CLEANUP;
+
+  public CleanupJobStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_CLEANUP, true);
+  }
+
+  /**
+   * Statistic name is extracted from the arguments.
+   * @param arguments args to the invocation.
+   * @return stage name.
+   */
+  @Override
+  protected String getStageStatisticName(Arguments arguments) {
+    return arguments.statisticName;
+  }
+
+  /**
+   * Clean up the job attempt directory tree.
+   * @param args arguments built up.
+   * @return the result.
+   * @throws IOException failure was raised an exceptions weren't surpressed.
+   */
+  @Override
+  protected Result executeStage(
+      final Arguments args)
+      throws IOException {
+    stageName = getStageName(args);
+    // this is $dest/_temporary
+    final Path baseDir = 
requireNonNull(getStageConfig().getOutputTempSubDir());
+    LOG.debug("{}: Cleaup of directory {} with {}", getName(), baseDir, args);
+    if (!args.enabled) {
+      LOG.info("{}: Cleanup of {} disabled", getName(), baseDir);
+      return new Result(Outcome.DISABLED, baseDir,
+          0, null, null);
+    }
+    // shortcut of a single existence check before anything else
+    if (getFileStatusOrNull(baseDir) == null) {
+      return new Result(Outcome.NOTHING_TO_CLEAN_UP,
+          baseDir,
+          0, null, null);
+    }
+
+    // move to trash?
+    // this will be set if delete fails.
+    boolean moveToTrash = args.moveToTrash;
+
+    // delete
+    final boolean attemptDelete = !moveToTrash;
+
+    Outcome outcome = null;
+    IOException exception = null;
+
+    if (attemptDelete) {
+      // to delete.
+      LOG.info("{}: Deleting job directory {}", getName(), baseDir);
+
+      if (args.deleteTaskAttemptDirsInParallel) {
+        // Attempt to do a parallel delete of task attempt dirs;
+        // don't overreact if a delete fails, but stop trying
+        // to delete the others, and fall back to deleting the
+        // job dir.
+        Path taskSubDir
+            = getStageConfig().getJobAttemptTaskSubDir();
+        try (DurationInfo info = new DurationInfo(LOG,
+            "parallel deletion of task attempts in %s",
+            taskSubDir)) {
+          RemoteIterator<FileStatus> dirs =
+              RemoteIterators.filteringRemoteIterator(
+                  listStatusIterator(taskSubDir),
+                  FileStatus::isDirectory);
+          TaskPool.foreach(dirs)
+              .executeWith(getIOProcessors())
+              .stopOnFailure()
+              .suppressExceptions(false)
+              .run(this::rmTaskAttemptDir);
+          getIOStatistics().aggregate((retrieveIOStatistics(dirs)));
+
+          if (getLastDeleteException() != null) {
+            // one of the task attempts failed.
+            throw getLastDeleteException();
+          }
+          // success: record this as the outcome.
+          outcome = Outcome.PARALLEL_DELETE;
+        } catch (FileNotFoundException ex) {
+          // not a problem if there's no dir to list.
+          LOG.debug("{}: Task attempt dir {} not found", getName(), 
taskSubDir);
+          outcome = Outcome.DELETED;
+        } catch (IOException ex) {
+          // failure. Log and continue
+          LOG.info("{}: Exception while listing/deleting task attempts under 
{}; continuing",
+              getName(),
+              taskSubDir, ex);
+          // not overreacting here as the base delete will still get executing
+          outcome = Outcome.DELETED;
+        }
+      }
+      // Now the top-level deletion; exception gets saved
+      exception = deleteOneDir(baseDir);
+      if (exception != null) {
+        // failure, report and continue
+        LOG.warn("{}: Deleting job directory {} failed: moving to trash", 
getName(), baseDir);
+        moveToTrash = true;

Review comment:
       This shouldn't be automatically enabled, right? Especially in the 
absence of any entity to go and clean-up this data at a later point.

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem;
+
+/**
+ * Extension of StoreOperationsThroughFileSystem with ABFS awareness.
+ * Purely for use by jobs committing work through the manifest committer.
+ * The {@link AzureManifestCommitterFactory} will configure the
+ * configure to use this as the binding to the FS.
+ *
+ * ADLS Gen2 stores support etag-recovery on renames, but not WASB
+ * stores.
+ */
+@InterfaceAudience.LimitedPrivate("mapreduce")
+@InterfaceStability.Unstable
+public class AbfsManifestStoreOperations extends
+    ManifestStoreOperationsThroughFileSystem {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsManifestStoreOperations.class);
+
+  /**
+   * Classname, which can be declared in jpb configurations.
+   */
+  public static final String NAME = 
AbfsManifestStoreOperations.class.getName();
+
+  /**
+   * Resilient rename calls; only available on an ADLS Gen2 store.
+   * Will be null after binding if the FS isn't compatible.
+   */
+  private ResilientCommitByRename resilientCommitByRename;
+
+  @Override
+  public AzureBlobFileSystem getFileSystem() {
+    return (AzureBlobFileSystem) super.getFileSystem();
+  }
+
+  /**
+   * Bind to the store.
+   *
+   * @param filesystem FS.
+   * @param path path to work under
+   * @throws IOException binding problems.
+   */
+  @Override
+  public void bindToFileSystem(FileSystem filesystem, Path path) throws 
IOException {
+    if (!(filesystem instanceof AzureBlobFileSystem)) {
+      throw new PathIOException(path.toString(),
+          "Not an abfs filesystem: " + filesystem.getClass());
+    }
+    super.bindToFileSystem(filesystem, path);
+    try {
+      resilientCommitByRename = 
getFileSystem().createResilientCommitSupport(path);
+      LOG.debug("Bonded to filesystem with resilient commits under path {}", 
path);
+    } catch (UnsupportedOperationException e) {
+      LOG.debug("No resilient commit support under path {}", path);
+    }
+  }
+
+  @Override
+  public boolean storePreservesEtagsThroughRenames(final Path path) {

Review comment:
       Not needed. This is already checked from the FileSystem itself.

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/EntryStatus.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.files;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * Status of a file or dir entry, designed to be marshalled as
+ * an integer -the ordinal value of the enum is the
+ * wire value.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public enum EntryStatus {
+
+  unknown,
+  not_found,
+  file,
+  dir,
+  created_dir;
+
+  /**
+   * Go from a marshalled type to a status value.
+   * Any out of range value is converted to unknown.
+   * @param type type
+   * @return the status value.
+   */
+  public static EntryStatus toEntryStatus(int type) {
+    switch (type) {
+    case 1:
+      return not_found;
+    case 2:
+      return file;
+    case 3:
+      return dir;
+    case 4:
+      return created_dir;
+    case 0:
+    default:
+      return unknown;
+    }
+  }
+
+
+  /**
+   * Go from the result of a getFileStatus call or
+   * listing entry to a status.
+   * A null argument is mapped to {@link #not_found}
+   * @param st file status
+   * @return the status enum.
+   */
+  public static EntryStatus toEntryStatus(@Nullable FileStatus st) {
+
+    if (st == null) {
+      return not_found;
+    }
+    if (st.isDirectory()) {
+      return dir;
+    }
+    if (st.isFile()) {
+      return file;

Review comment:
       Would a DirEntry with type = 'file' be valid? Likewise for a FileEntry.
   Should there be a sanity validation in DirEntry.dirEntry / new FileEntry

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java
##########
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_MOVE_TO_TRASH;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_MOVE_TO_TRASH_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_ATTEMPT_DIRS;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_ATTEMPT_DIRS_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
+
+/**
+ * Clean up a job's temporary directory through parallel delete,
+ * base _temporary delete and as a fallback, rename to trash.
+ * Returns: the outcome of the overall operation and any move to trash.
+ * The result is detailed purely for the benefit of tests, which need
+ * to make assertions about error handling and fallbacks.
+ */
+public class CleanupJobStage extends
+    AbstractJobCommitStage<
+        CleanupJobStage.Arguments,
+        CleanupJobStage.Result> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CleanupJobStage.class);
+
+  /**
+   * Count of deleted directories.
+   */
+  private final AtomicInteger deleteDirCount = new AtomicInteger();
+
+  /**
+   * Count of delete failures.
+   */
+  private final AtomicInteger deleteFailureCount = new AtomicInteger();
+
+  /**
+   * Last delete exception; non null if deleteFailureCount is not zero.
+   */
+  private IOException lastDeleteException = null;
+
+  /**
+   * Stage name as passed in from arguments.
+   */
+  private String stageName = OP_STAGE_JOB_CLEANUP;
+
+  public CleanupJobStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_CLEANUP, true);
+  }
+
+  /**
+   * Statistic name is extracted from the arguments.
+   * @param arguments args to the invocation.
+   * @return stage name.
+   */
+  @Override
+  protected String getStageStatisticName(Arguments arguments) {
+    return arguments.statisticName;
+  }
+
+  /**
+   * Clean up the job attempt directory tree.
+   * @param args arguments built up.
+   * @return the result.
+   * @throws IOException failure was raised an exceptions weren't surpressed.
+   */
+  @Override
+  protected Result executeStage(
+      final Arguments args)
+      throws IOException {
+    stageName = getStageName(args);
+    // this is $dest/_temporary
+    final Path baseDir = 
requireNonNull(getStageConfig().getOutputTempSubDir());
+    LOG.debug("{}: Cleaup of directory {} with {}", getName(), baseDir, args);
+    if (!args.enabled) {
+      LOG.info("{}: Cleanup of {} disabled", getName(), baseDir);
+      return new Result(Outcome.DISABLED, baseDir,
+          0, null, null);
+    }
+    // shortcut of a single existence check before anything else
+    if (getFileStatusOrNull(baseDir) == null) {
+      return new Result(Outcome.NOTHING_TO_CLEAN_UP,
+          baseDir,
+          0, null, null);
+    }
+
+    // move to trash?
+    // this will be set if delete fails.
+    boolean moveToTrash = args.moveToTrash;

Review comment:
       This gets quite confusing. There's an argument to specify whether to 
move to trash, and subsequently there's a check on whether the filesystem has 
trash enabled - which I think ends up overriding this parameter?
   
   If the FileSystem does have trash enabled - the delete FS operation would 
automatically take care of moving the contents to trash?
   
   So the config parameter can end up being ignored either way
   1) FS enables trash, committer config has moveToTrash set to false <-- 
moveToTrash ignored in favor of the FS having trash enabled (implicitly via the 
delete).
   2) FS disables trash, committer config has moveToTrash set to true <-- 
moveToTrash ignored because the FS does nor support trash.
   3) FS enables trash, committer config has moveToTrash set to true <-- 
Redundant, because the delete will use the trash.
   4) FS disables trash, committer config has moveToTrash set to false <-- 
trash isn't used.
   
   moveToTrash flag seems to not make a difference?
   
   I think I'm missing something in the overall "moveToTrash being implemented 
in the committer" scenario.

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem;
+
+/**
+ * Extension of StoreOperationsThroughFileSystem with ABFS awareness.
+ * Purely for use by jobs committing work through the manifest committer.
+ * The {@link AzureManifestCommitterFactory} will configure the
+ * configure to use this as the binding to the FS.
+ *
+ * ADLS Gen2 stores support etag-recovery on renames, but not WASB
+ * stores.
+ */
+@InterfaceAudience.LimitedPrivate("mapreduce")
+@InterfaceStability.Unstable
+public class AbfsManifestStoreOperations extends

Review comment:
       On avoiding the new dependency on hadoop-mapreduce-client (or the 
reverse where hadoop-mapreduce-client ends up depending on hadoop-cloud-storage 
/ azure specifically)
   
   Does it make sense to make ResilientCommitByRename a little more generic, 
and move it to hadoop-common as a LimitedPrivate("cloud-storage"?)/Unstable.
   AzureBlobFileSystem is the only one to implement the interface right now.
   
   AbfsManifestStoreOperations - can then make decisions based on whether the 
FileSystem implements the interface (and can potentially be merged into 
ManifestStoreOperationsThroughFileSystem itself) / or gets renamed to something 
like ResilientRenameCapableManifestStoreOperationsThroughFileSystem.
   
   All depends on moving ResilientCommitByRename to hadoop-common. I think this 
is quite a bit cleaner than adding the dependencies in either direction.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 739797)
    Time Spent: 29h 40m  (was: 29.5h)

> Add a task-manifest output committer for Azure and GCS
> ------------------------------------------------------
>
>                 Key: MAPREDUCE-7341
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7341
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: client
>    Affects Versions: 3.3.1
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 29h 40m
>  Remaining Estimate: 0h
>
> Add a task-manifest output committer for Azure and GCS
> The S3A committers are very popular in Spark on S3, as they are both correct 
> and fast.
> The classic FileOutputCommitter v1 and v2 algorithms are all that is 
> available for Azure ABFS and Google GCS, and they have limitations. 
> The v2 algorithm isn't safe in the presence of failed task attempt commits, 
> so we
> recommend the v1 algorithm for Azure. But that is slow because it 
> sequentially lists
> then renames files and directories, one-by-one. The latencies of list
> and rename make things slow.
> Google GCS lacks the atomic directory rename required for v1 correctness;
> v2 can be used (which doesn't have the job commit performance limitations),
> but it's not safe.
> Proposed
> * Add a new FileOutputFormat committer which uses an intermediate manifest to
>   pass the list of files created by a TA to the job committer.
> * Job committer to parallelise reading these task manifests and submit all the
>   rename operations into a pool of worker threads. (also: mkdir, directory 
> deletions on cleanup)
> * Use the committer plugin mechanism added for s3a to make this the default 
> committer for ABFS
>   (i.e. no need to make any changes to FileOutputCommitter)
> * Add lots of IOStatistics instrumentation + logging of operations in the 
> JobCommit
>   for visibility of where delays are occurring.
> * Reuse the S3A committer _SUCCESS JSON structure to publish IOStats & other 
> data
>   for testing/support.  
> This committer will be faster than the V1 algorithm because of the 
> parallelisation, and
> because a manifest written by create-and-rename will be exclusive to a single 
> task
> attempt, delivers the isolation which the v2 committer lacks.
> This is not an attempt to do an iceberg/hudi/delta-lake style manifest-only 
> format
> for describing the contents of a table; the final output is still a directory 
> tree
> which must be scanned during query planning.
> As such the format is still suboptimal for cloud storage -but at least we 
> will have
> faster job execution during the commit phases.
>   
> Note: this will also work on HDFS, where again, it should be faster than
> the v1 committer. However the target is very much Spark with ABFS and GCS; no 
> plans to worry about MR as that simplifies the challenge of dealing with job 
> restart (i.e. you don't have to)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to