My little committer project, an intermediate manifest committer for Azure and GCS is reaching the stage where it's ready for others to look at
https://github.com/apache/hadoop/pull/2971 Goals 1. Correctness even on GCS, which doesn't have atomic dir rename (so v1 isn't safe). It does use FileSystem.rename() for commits; this is not for S3. 2. Performance closer to Mr v2 than v1. 3. Workaround some issues with the object stores (most recently, ABFS timing out on delete of deep dir trees https://issues.apache.org/jira/browse/HADOOP-17691 ) 4. integrated stats collection and reporting 5. Not going near any of the working committer code 6. Output 100% compatible with existing directory-partition model. The manifests are only intermediate architecture doc https://github.com/steveloughran/hadoop/blob/mr/MAPREDUCE-7341-manifest-committer/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md user guide https://github.com/steveloughran/hadoop/blob/mr/MAPREDUCE-7341-manifest-committer/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md 1. This is really targeting spark. It works on MR, I've just avoided the job restart problem because spark doesn't attempt to do that. 2. It will work on HDFS, but I'm not testing there or targeting it. Things like mkdir() are fast there, as is dir rename. I'm reusing the same JSON report in a _SUCCESS marker as the s3a committers; adding 1. collecting IOStatistics timing of all operations invoked during task and job commits 2. also supporting an option to save all stats reports to a directory in a cluster FS, including even on job failures/abort. Attached what a stats report looks like on a mini terasort (byte sort). It's collecting the task commit numbers via the intermediate manifests, and aggregating them. No collection of actual IO during task execution; we'd need per thread collection of stats there. Please have a look at the PR. Yes, it's big, but it's broken up into lots of stages for ease of understanding and testing. One of the lessons from the S3A Committer work. -Steve ------ { "name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1", "timestamp" : 1625693109922, "success" : true, "date" : "2021-07-07T22:25:09.922+01:00[Europe/London]", "hostname" : "stevel-mbp15-13176.local", "committer" : "org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter", "description" : null, "jobId" : "job_1625693035829_0001", "jobIdSource" : "JobID", "metrics" : { }, "diagnostics" : { "principal" : "stevel", "stage" : "committer_commit_job" }, "filenames" : [ "abfs:// stevel-test...@ukwest.dfs.core.windows.net/terasort/sortin/part-m-00001", "abfs:// stevel-test...@ukwest.dfs.core.windows.net/terasort/sortin/part-m-00000" ], "state" : null, "iostatistics" : { "counters" : { "op_job_stage_abort" : 0, "committer_commit_job" : 1, "op_load_manifest.failures" : 0, "op_job_stage_optional_validate_output.failures" : 0, "op_load_all_manifests" : 1, "op_rename.failures" : 0, "op_job_stage_create_target_dirs" : 1, "op_task_stage_save_manifest" : 0, "committer_commit_job.failures" : 0, "op_job_stage_cleanup.failures" : 0, "op_stage_task_scan_directory" : 2, "op_task_stage_abort_task.failures" : 0, "op_job_stage_rename_files.failures" : 0, "committer_bytes_committed" : 100000, "op_create_one_directory.failures" : 0, "op_mkdirs.failures" : 0, "committer_files_committed" : 2, "op_load_all_manifests.failures" : 0, "op_save_task_manifest" : 1, "op_job_stage_load_manifests" : 1, "op_task_stage_setup" : 2, "op_stage_task_commit" : 2, "op_create_directories.failures" : 0, "op_stage_task_scan_directory.failures" : 0, "op_create_one_directory" : 0, "op_save_task_manifest.failures" : 0, "op_directory_scan" : 3, "op_rename" : 3, "op_get_file_status" : 8, "op_job_stage_rename_files" : 1, "op_is_directory.failures" : 0, "op_job_stage_abort.failures" : 0, "op_job_stage_optional_validate_output" : 1, "op_task_stage_setup.failures" : 0, "op_delete" : 8, "op_job_stage_setup" : 1, "op_directory_scan.failures" : 0, "object_list_request.failures" : 0, "op_load_manifest" : 2, "object_continue_list_request.failures" : 0, "committer_tasks_completed" : 0, "op_task_stage_save_manifest.failures" : 0, "op_get_file_status.failures" : 3, "op_task_stage_abort_task" : 1, "op_delete.failures" : 0, "object_list_request" : 0, "op_job_stage_create_target_dirs.failures" : 0, "op_list_status" : 4, "committer_tasks_failed" : 0, "op_job_stage_cleanup" : 1, "op_job_stage_setup.failures" : 0, "op_job_stage_save_success_marker" : 1, "object_continue_list_request" : 0, "op_stage_task_commit.failures" : 0, "op_create_directories" : 1, "op_job_stage_load_manifests.failures" : 0, "op_mkdirs" : 3, "op_is_directory" : 0, "op_job_stage_save_success_marker.failures" : 0, "op_list_status.failures" : 0 }, "gauges" : { }, "minimums" : { "op_create_directories.failures.min" : -1, "op_is_directory.failures.min" : -1, "object_list_request.min" : -1, "op_task_stage_setup.min" : 121, "op_job_stage_create_target_dirs.failures.min" : -1, "object_continue_list_request.failures.min" : -1, "object_list_request.failures.min" : -1, "op_job_stage_rename_files.min" : 218, "op_directory_scan.min" : 34, "op_job_stage_optional_validate_output.min" : 104, "op_job_stage_setup.failures.min" : -1, "op_list_status.min" : 23, "op_mkdirs.failures.min" : -1, "op_job_stage_abort.failures.min" : -1, "committer_bytes_committed" : -1, "committer_files_committed" : -1, "op_rename.failures.min" : -1, "op_load_all_manifests.min" : 399, "op_job_stage_save_success_marker.failures.min" : -1, "op_load_manifest.failures.min" : -1, "committer_commit_job.min" : 1007, "op_stage_task_scan_directory.failures.min" : -1, "committer_commit_job.failures.min" : -1, "op_delete.min" : 24, "op_job_stage_cleanup.min" : 151, "op_get_file_status.min" : 21, "op_load_all_manifests.failures.min" : -1, "op_get_file_status.failures.min" : 47, "op_create_one_directory.failures.min" : -1, "op_load_manifest.min" : 208, "op_create_one_directory.min" : -1, "op_mkdirs.min" : 29, "op_save_task_manifest.min" : 99, "op_task_stage_save_manifest.min" : -1, "op_job_stage_abort.min" : -1, "op_job_stage_optional_validate_output.failures.min" : -1, "op_list_status.failures.min" : -1, "op_directory_scan.failures.min" : -1, "op_job_stage_load_manifests.failures.min" : -1, "committer_tasks_completed" : -1, "op_job_stage_create_target_dirs.min" : 2, "object_continue_list_request.min" : -1, "op_task_stage_setup.failures.min" : -1, "op_job_stage_setup.min" : 693, "committer_tasks_failed" : -1, "op_stage_task_scan_directory.min" : -1, "op_job_stage_rename_files.failures.min" : -1, "op_save_task_manifest.failures.min" : -1, "op_delete.failures.min" : -1, "op_stage_task_commit.failures.min" : -1, "op_stage_task_commit.min" : -1, "op_create_directories.min" : 1, "op_job_stage_load_manifests.min" : 554, "op_task_stage_abort_task.min" : 256, "op_is_directory.min" : -1, "op_task_stage_abort_task.failures.min" : -1, "op_rename.min" : 51, "op_job_stage_save_success_marker.min" : 196, "op_job_stage_cleanup.failures.min" : -1, "op_task_stage_save_manifest.failures.min" : -1 }, "maximums" : { "op_job_stage_load_manifests.max" : 554, "op_load_all_manifests.failures.max" : -1, "op_job_stage_save_success_marker.max" : 196, "op_job_stage_abort.failures.max" : -1, "op_stage_task_commit.max" : -1, "op_create_one_directory.max" : -1, "op_is_directory.failures.max" : -1, "object_list_request.failures.max" : -1, "op_task_stage_setup.max" : 885, "op_job_stage_create_target_dirs.failures.max" : -1, "committer_bytes_committed" : -1, "committer_files_committed" : -1, "op_job_stage_optional_validate_output.max" : 104, "op_directory_scan.max" : 149, "op_mkdirs.max" : 339, "op_create_one_directory.failures.max" : -1, "op_get_file_status.failures.max" : 803, "op_job_stage_cleanup.max" : 151, "op_save_task_manifest.max" : 99, "op_create_directories.failures.max" : -1, "op_rename.failures.max" : -1, "op_job_stage_save_success_marker.failures.max" : -1, "op_job_stage_setup.failures.max" : -1, "op_job_stage_load_manifests.failures.max" : -1, "op_directory_scan.failures.max" : -1, "op_job_stage_rename_files.max" : 218, "op_delete.max" : 256, "object_list_request.max" : -1, "committer_commit_job.max" : 1007, "op_task_stage_abort_task.max" : 256, "op_get_file_status.max" : 95, "op_delete.failures.max" : -1, "object_continue_list_request.max" : -1, "op_job_stage_abort.max" : -1, "op_is_directory.max" : -1, "op_task_stage_save_manifest.max" : -1, "op_job_stage_cleanup.failures.max" : -1, "op_list_status.failures.max" : -1, "op_save_task_manifest.failures.max" : -1, "op_load_manifest.max" : 384, "committer_tasks_completed" : -1, "op_load_manifest.failures.max" : -1, "op_stage_task_scan_directory.failures.max" : -1, "committer_commit_job.failures.max" : -1, "op_task_stage_save_manifest.failures.max" : -1, "op_job_stage_create_target_dirs.max" : 2, "op_create_directories.max" : 1, "op_stage_task_commit.failures.max" : -1, "op_mkdirs.failures.max" : -1, "committer_tasks_failed" : -1, "op_list_status.max" : 97, "op_task_stage_setup.failures.max" : -1, "op_task_stage_abort_task.failures.max" : -1, "op_rename.max" : 165, "op_job_stage_optional_validate_output.failures.max" : -1, "op_load_all_manifests.max" : 399, "op_job_stage_setup.max" : 693, "object_continue_list_request.failures.max" : -1, "op_job_stage_rename_files.failures.max" : -1, "op_stage_task_scan_directory.max" : -1 }, "meanstatistics" : { "op_job_stage_optional_validate_output.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_rename.mean" : { "samples" : 3, "sum" : 366 }, "op_load_manifest.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_is_directory.mean" : { "samples" : 0, "sum" : 0 }, "op_create_one_directory.mean" : { "samples" : 0, "sum" : 0 }, "op_job_stage_create_target_dirs.mean" : { "samples" : 1, "sum" : 2 }, "op_rename.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_job_stage_abort.mean" : { "samples" : 0, "sum" : 0 }, "op_job_stage_setup.mean" : { "samples" : 1, "sum" : 693 }, "committer_bytes_committed" : { "samples" : 0, "sum" : 0 }, "op_delete.mean" : { "samples" : 8, "sum" : 531 }, "op_stage_task_scan_directory.failures.mean" : { "samples" : 0, "sum" : 0 }, "committer_files_committed" : { "samples" : 0, "sum" : 0 }, "op_job_stage_abort.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_job_stage_save_success_marker.mean" : { "samples" : 1, "sum" : 196 }, "op_load_manifest.mean" : { "samples" : 2, "sum" : 592 }, "op_save_task_manifest.mean" : { "samples" : 1, "sum" : 99 }, "op_task_stage_save_manifest.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_job_stage_optional_validate_output.mean" : { "samples" : 1, "sum" : 104 }, "op_is_directory.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_mkdirs.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_job_stage_cleanup.failures.mean" : { "samples" : 0, "sum" : 0 }, "object_list_request.mean" : { "samples" : 0, "sum" : 0 }, "op_task_stage_abort_task.mean" : { "samples" : 1, "sum" : 256 }, "object_list_request.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_task_stage_setup.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_get_file_status.failures.mean" : { "samples" : 3, "sum" : 1149 }, "op_job_stage_setup.failures.mean" : { "samples" : 0, "sum" : 0 }, "object_continue_list_request.mean" : { "samples" : 0, "sum" : 0 }, "op_task_stage_save_manifest.mean" : { "samples" : 0, "sum" : 0 }, "op_directory_scan.mean" : { "samples" : 3, "sum" : 247 }, "op_job_stage_save_success_marker.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_job_stage_rename_files.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_job_stage_cleanup.mean" : { "samples" : 1, "sum" : 151 }, "op_job_stage_load_manifests.mean" : { "samples" : 1, "sum" : 554 }, "op_get_file_status.mean" : { "samples" : 5, "sum" : 210 }, "committer_tasks_completed" : { "samples" : 0, "sum" : 0 }, "op_list_status.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_create_directories.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_load_all_manifests.mean" : { "samples" : 1, "sum" : 399 }, "committer_commit_job.mean" : { "samples" : 1, "sum" : 1007 }, "op_create_one_directory.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_load_all_manifests.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_directory_scan.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_mkdirs.mean" : { "samples" : 3, "sum" : 423 }, "op_list_status.mean" : { "samples" : 4, "sum" : 217 }, "op_task_stage_setup.mean" : { "samples" : 2, "sum" : 1006 }, "committer_commit_job.failures.mean" : { "samples" : 0, "sum" : 0 }, "committer_tasks_failed" : { "samples" : 0, "sum" : 0 }, "op_job_stage_create_target_dirs.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_create_directories.mean" : { "samples" : 1, "sum" : 1 }, "op_job_stage_rename_files.mean" : { "samples" : 1, "sum" : 218 }, "op_stage_task_scan_directory.mean" : { "samples" : 0, "sum" : 0 }, "op_task_stage_abort_task.failures.mean" : { "samples" : 0, "sum" : 0 }, "object_continue_list_request.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_delete.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_stage_task_commit.mean" : { "samples" : 0, "sum" : 0 }, "op_job_stage_load_manifests.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_stage_task_commit.failures.mean" : { "samples" : 0, "sum" : 0 }, "op_save_task_manifest.failures.mean" : { "samples" : 0, "sum" : 0 } } } }