[
https://issues.apache.org/jira/browse/MAPREDUCE-7474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842389#comment-17842389
]
ASF GitHub Bot commented on MAPREDUCE-7474:
-------------------------------------------
steveloughran commented on code in PR #6716:
URL: https://github.com/apache/hadoop/pull/6716#discussion_r1584841337
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java:
##########
@@ -582,19 +611,111 @@ protected final Path directoryMustExist(
* Save a task manifest or summary. This will be done by
* writing to a temp path and then renaming.
* If the destination path exists: Delete it.
+ * This will retry so that a rename failure from abfs load or IO errors
+ * will not fail the task.
* @param manifestData the manifest/success file
* @param tempPath temp path for the initial save
* @param finalPath final path for rename.
- * @throws IOException failure to load/parse
+ * @return the manifest saved.
+ * @throws IOException failure to rename after retries.
*/
@SuppressWarnings("unchecked")
- protected final <T extends AbstractManifestData> void save(T manifestData,
+ protected final <T extends AbstractManifestData> T save(
+ final T manifestData,
final Path tempPath,
final Path finalPath) throws IOException {
- LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath,
finalPath);
- trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () ->
- operations.save(manifestData, tempPath, true));
- renameFile(tempPath, finalPath);
+ return saveManifest(() -> manifestData, tempPath, finalPath,
OP_SAVE_TASK_MANIFEST);
+ }
+
+ /**
+ * Generate and save a task manifest or summary file.
+ * This is be done by writing to a temp path and then renaming.
+ * <p>
+ * If the destination path exists: Delete it before the rename.
+ * <p>
+ * This will retry so that a rename failure from abfs load or IO errors
+ * such as delete or save failure will not fail the task.
+ * <p>
+ * The {@code manifestSource} supplier is invoked to get the manifest data
+ * on every attempt.
+ * This permits statistics to be updated, <i>including those of failures</i>.
+ * @param manifestSource supplier the manifest/success file
+ * @param tempPath temp path for the initial save
+ * @param finalPath final path for rename.
+ * @param statistic statistic to use for timing
+ * @return the manifest saved.
+ * @throws IOException failure to save/delete/rename after retries.
+ */
+ @SuppressWarnings("unchecked")
+ protected final <T extends AbstractManifestData> T saveManifest(
+ final Supplier<T> manifestSource,
+ final Path tempPath,
+ final Path finalPath,
+ String statistic) throws IOException {
+
+ AtomicInteger retryCount = new AtomicInteger(0);
+ RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep(
+ getStageConfig().getManifestSaveAttempts(),
+ SAVE_SLEEP_INTERVAL,
+ TimeUnit.MILLISECONDS);
+
+ // loop until returning a value or raising an exception
+ while (true) {
+ try {
+ T manifestData = requireNonNull(manifestSource.get());
+ trackDurationOfInvocation(getIOStatistics(), statistic, () -> {
+ LOG.info("{}: save manifest to {} then rename as {}'); retry
count={}",
+ getName(), tempPath, finalPath, retryCount);
+
+ // delete temp path.
+ // even though this is written with overwrite=true, this extra
recursive
+ // delete also handles a directory being there.
+ deleteRecursive(tempPath, OP_DELETE);
+
+ // save the temp file, overwriting any which remains from an earlier
attempt
+ operations.save(manifestData, tempPath, true);
+
+ // delete the destination in case it exists either from a failed
previous
+ // attempt or from a concurrent task commit.
+ delete(finalPath, true, OP_DELETE);
+
+ // rename temp to final
+ renameFile(tempPath, finalPath);
Review Comment:
you mean use commitFile() after creating a file entry, so pushing more of
the recovery down? we could do that. we won't have the etag of the create file
though.
> [ABFS] Improve commit resilience and performance in Manifest Committer
> ----------------------------------------------------------------------
>
> Key: MAPREDUCE-7474
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7474
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Components: client
> Affects Versions: 3.4.0, 3.3.6
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
>
> * Manifest committer is not resilient to rename failures on task commit
> without HADOOP-18012 rename recovery enabled.
> * large burst of delete calls noted: are they needed?
> relates to HADOOP-19093 but takes a more minimal approach with goal of
> changes in manifest committer only.
> Initial proposed changes
> * retry recovery on task commit rename, always (repeat save, delete, rename)
> * audit delete use and see if it can be pruned
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]