Hello.

I’m Johnny, I work at Stripe. We’re heavy Spark users and we’ve been
exploring using s3 committers. Currently we first write the data to HDFS
and then upload it to S3. However, now with S3 offering strong consistency
guarantees, we are evaluating if we can write data directly to S3.

We’re having some troubles with performance, so hoping someone might have
some guidance which can unblock this.

File Format
We are using parquet as the File Format. We do have iceberg tables as well,
and they are indeed able to commit directly to S3 (with minimal local disk
usage). We can’t migrate all of our jobs to iceberg right now. Hence, we
are looking for a committer that is performant and can directly write
parquet files to S3 (with minimal local disk usage).
What have we tried?
We’ve tried using both the “magic” and “directory” committers. We're
setting the following configs (in addition to the "magic/directory"
committer.name).

"spark.hadoop.fs.s3a.committer.magic.enabled":"true",


"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a":"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",

"spark.sql.sources.commitProtocolClass":"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",

"spark.sql.parquet.output.committer.class":"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",

Both committers have shown performance regressions on large jobs. We’re
currently focused on trying to make the directory committer work because
we’ve seen *fewer* slowdowns with that one, but I’ll describe the problems
with each.

We’ve been testing the committers on a large job with 100k tasks (creating
7.3TB of output).
Observations for magic committer

Using the magic committer, we see slowdowns in two places:


   - *S3 Writing* *(inside** the task)*


   - The slowdown seems to occur just after the s3 multipart write. The
   finishedWrite
   
<https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L4253>
   function tries to do some cleanup and kicks off this
   deleteUnnecessaryFakeDirectories
   
<https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L4350-L4373>
   function
   
<https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L4350-L4373>
   .



   - This causes 503’s due to hitting AWS rate limits on
   com.amazonaws.services.s3.model.DeleteObjectsRequest


   - I'm not sure what directories are actually getting cleaned up here (I
   assume the _magic directories are still needed up until the job commit).



   - *Job Commit*


   - Have not dug down into the details here, but assume it is something
   similar to what we’re seeing in the directory committer case below.

Observations for directory committer

We’ve observed that the “directory” s3committer performance is on-par with
our existing HDFS commit for task execution and task commit. The slowdowns
we’re seeing are in the job commit phase.

The job commit happens almost instantaneously in the HDFS case, vs taking
about an hour for the s3 directory committer.

We’ve enabled DEBUG logging for the s3 committer. It seems like that hour
is mostly spent doing things which you would expect (completing 100k
delayedComplete s3 uploads). I've attached an example of some of the logs
we see repeated over-and-over during the 1 hour job commit (I redacted some
of the directories and SHAs but the logs are otherwise unchanged).

One thing I notice is that we see object_delete_requests += 1 in the logs.
I’m not sure if that means it’s doing an s3 delete, or it is deleting the
HDFS manifest files (to clean up the task).

Alternatives - Should we check out directCommitter?
We’ve also considered using the directCommitter. We understand that the
directCommitter is discouraged because it does not support speculative
execution (and for some failure cases). Given that we do not use
speculative execution at Stripe, would the directCommitter be a viable
option for us? What are the failure scenarios to consider?

Alternatives - Can S3FileIO work well with parquet files?

Netflix has a tool called s3FileIO
<https://iceberg.apache.org/aws/#s3-fileio>. We’re wondering if it can be
used with spark, or only with Iceburg.
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.CommitOperations: 
Committing single commit DelayedCompleteData{version=1, 
uri='s3a://some-location.zstd.parquet', destination='tmp/some/location', 
uploadId='some-upload-id', created=1624338855338, saved=1624338855338, 
size=78554000, date='Tue Jun 22 05:14:15 UTC 2021', jobId='', taskId='', 
notes='', etags=[ETAG-HERE]}
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.WriteOperationHelper: 
Completing multipart upload SHAHERE - with 1 parts
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.Tasks: Finished 
count -> 17797/92240
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem: Finished 
write to /location/part-ABC-application_1623777300846_7091.zstd.parquet, len 
79054951
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics: 
object_delete_requests += 1  ->  17801
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem: Finished 
write to /location/part-DEF-application_1623777300846_7091.zstd.parquet, len 
80191566
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics: 
object_delete_requests += 1  ->  17802
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem: Finished 
write to /location/part-GHI-application_1623777300846_7091.zstd.parquet, len 
78175474
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics: 
object_delete_requests += 1  ->  17803
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.CommitOperations: 
Successful commit of file length 79666929
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.Tasks: Task succeeded
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.Tasks: Executing task
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.CommitOperations: 
Committing single commit DelayedCompleteData{version=1, 
uri='s3a://some-location.zstd.parquet', destination='tmp/some/location', 
uploadId='some-upload-id', created=1624338849266, saved=1624338849266, 
size=80385806, date='Tue Jun 22 05:14:09 UTC 2021', jobId='', taskId='', 
notes='', etags=[ETAG-HERE]}
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.WriteOperationHelper: 
Completing multipart upload 
GErt.EpcPiIZdDL2rETwlSqQ7IiM4ZyxDYPs.ZRcDHy94cP2G3ld5CeLmNdiT0vb9zQOjzVw8BT.W6ymNI_2HI16UmXoZBFx6fYfKpHTJ3cbsRl5xROaSTNDe_jBjSYwO01LnnsrUJKMotlBgux8t.9Tt.OZX.Zq_TxQon55330-
 with 1 parts
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.CommitOperations: 
Successful commit of file length 83611204
21/06/22 06:24:42 {} DEBUG org.apache.hadoop.fs.s3a.commit.Tasks: Task succeeded
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to