Hello. I’m Johnny, I work at Stripe. We’re heavy Spark users and we’ve been exploring using s3 committers. Currently we first write the data to HDFS and then upload it to S3. However, now with S3 offering strong consistency guarantees, we are evaluating if we can write data directly to S3.
We’re having some troubles with performance, so hoping someone might have some guidance which can unblock this. File Format We are using parquet as the File Format. We do have iceberg tables as well, and they are indeed able to commit directly to S3 (with minimal local disk usage). We can’t migrate all of our jobs to iceberg right now. Hence, we are looking for a committer that is performant and can directly write parquet files to S3 (with minimal local disk usage). What have we tried? We’ve tried using both the “magic” and “directory” committers. We're setting the following configs (in addition to the "magic/directory" committer.name). "spark.hadoop.fs.s3a.committer.magic.enabled":"true", "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a":"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory", "spark.sql.sources.commitProtocolClass":"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol", "spark.sql.parquet.output.committer.class":"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter", Both committers have shown performance regressions on large jobs. We’re currently focused on trying to make the directory committer work because we’ve seen *fewer* slowdowns with that one, but I’ll describe the problems with each. We’ve been testing the committers on a large job with 100k tasks (creating 7.3TB of output). Observations for magic committer Using the magic committer, we see slowdowns in two places: - *S3 Writing* *(inside** the task)* - The slowdown seems to occur just after the s3 multipart write. The finishedWrite <https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L4253> function tries to do some cleanup and kicks off this deleteUnnecessaryFakeDirectories <https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L4350-L4373> function <https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L4350-L4373> . - This causes 503’s due to hitting AWS rate limits on com.amazonaws.services.s3.model.DeleteObjectsRequest - I'm not sure what directories are actually getting cleaned up here (I assume the _magic directories are still needed up until the job commit). - *Job Commit* - Have not dug down into the details here, but assume it is something similar to what we’re seeing in the directory committer case below. Observations for directory committer We’ve observed that the “directory” s3committer performance is on-par with our existing HDFS commit for task execution and task commit. The slowdowns we’re seeing are in the job commit phase. The job commit happens almost instantaneously in the HDFS case, vs taking about an hour for the s3 directory committer. We’ve enabled DEBUG logging for the s3 committer. It seems like that hour is mostly spent doing things which you would expect (completing 100k delayedComplete s3 uploads). I've attached an example of some of the logs we see repeated over-and-over during the 1 hour job commit (I redacted some of the directories and SHAs but the logs are otherwise unchanged). One thing I notice is that we see object_delete_requests += 1 in the logs. I’m not sure if that means it’s doing an s3 delete, or it is deleting the HDFS manifest files (to clean up the task). Alternatives - Should we check out directCommitter? We’ve also considered using the directCommitter. We understand that the directCommitter is discouraged because it does not support speculative execution (and for some failure cases). Given that we do not use speculative execution at Stripe, would the directCommitter be a viable option for us? What are the failure scenarios to consider? Alternatives - Can S3FileIO work well with parquet files? Netflix has a tool called s3FileIO <https://iceberg.apache.org/aws/#s3-fileio>. We’re wondering if it can be used with spark, or only with Iceburg.
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.CommitOperations: Committing single commit DelayedCompleteData{version=1, uri='s3a://some-location.zstd.parquet', destination='tmp/some/location', uploadId='some-upload-id', created=1624338855338, saved=1624338855338, size=78554000, date='Tue Jun 22 05:14:15 UTC 2021', jobId='', taskId='', notes='', etags=[ETAG-HERE]} 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.WriteOperationHelper: Completing multipart upload SHAHERE - with 1 parts 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.Tasks: Finished count -> 17797/92240 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem: Finished write to /location/part-ABC-application_1623777300846_7091.zstd.parquet, len 79054951 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics: object_delete_requests += 1 -> 17801 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem: Finished write to /location/part-DEF-application_1623777300846_7091.zstd.parquet, len 80191566 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics: object_delete_requests += 1 -> 17802 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem: Finished write to /location/part-GHI-application_1623777300846_7091.zstd.parquet, len 78175474 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics: object_delete_requests += 1 -> 17803 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.CommitOperations: Successful commit of file length 79666929 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.Tasks: Task succeeded 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.Tasks: Executing task 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.CommitOperations: Committing single commit DelayedCompleteData{version=1, uri='s3a://some-location.zstd.parquet', destination='tmp/some/location', uploadId='some-upload-id', created=1624338849266, saved=1624338849266, size=80385806, date='Tue Jun 22 05:14:09 UTC 2021', jobId='', taskId='', notes='', etags=[ETAG-HERE]} 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.WriteOperationHelper: Completing multipart upload GErt.EpcPiIZdDL2rETwlSqQ7IiM4ZyxDYPs.ZRcDHy94cP2G3ld5CeLmNdiT0vb9zQOjzVw8BT.W6ymNI_2HI16UmXoZBFx6fYfKpHTJ3cbsRl5xROaSTNDe_jBjSYwO01LnnsrUJKMotlBgux8t.9Tt.OZX.Zq_TxQon55330- with 1 parts 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.CommitOperations: Successful commit of file length 83611204 21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.Tasks: Task succeeded
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org