[jira] [Commented] (SPARK-31911) Using S3A staging committer, pending uploads are committed more than once and listed incorrectly in _SUCCESS data

2022-03-10 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504273#comment-17504273
 ] 

Steve Loughran commented on SPARK-31911:


I'm going to close as fixed now; the spark changes will have done it.

> Using S3A staging committer, pending uploads are committed more than once and 
> listed incorrectly in _SUCCESS data
> -
>
> Key: SPARK-31911
> URL: https://issues.apache.org/jira/browse/SPARK-31911
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.4
>Reporter: Brandon
>Priority: Major
>
> First of all thanks for the great work on the S3 committers. I was able set 
> up the directory staging committer in my environment following docs at 
> [https://github.com/apache/spark/blob/master/docs/cloud-integration.md#committing-work-into-cloud-storage-safely-and-fast]
>  and tested one of my Spark applications using it. The Spark version is 2.4.4 
> with Hadoop 3.2.1 and the cloud committer bindings. The application writes 
> multiple DataFrames to ORC/Parquet in S3, submitting them as write jobs to 
> Spark in parallel.
> I think I'm seeing a bug where the staging committer will complete pending 
> uploads more than once. The main symptom how I discovered this is that the 
> _SUCCESS data files under each table will contain overlapping file names that 
> belong to separate tables. From my reading of the code, that's because the 
> filenames in _SUCCESS reflect which multipart uploads were completed in the 
> commit for that particular table.
> An example:
> Concurrently, fire off DataFrame.write.orc("s3a://bucket/a") and 
> DataFrame.write.orc("s3a://bucket/b"). Suppose each table has one partition 
> so writes one partition file.
> When the two writes are done,
>  * /a/_SUCCESS contains two filenames: /a/part- and /b/part-.
>  * /b/_SUCCESS contains the same two filenames.
> Setting S3A logs to debug, I see the commitJob operation belonging to table a 
> includes completing the uploads of /a/part- and /b/part-. Then again, 
> commitJob for table b includes the same completions. I haven't had a problem 
> yet, but I wonder if having these extra requests would become an issue at 
> higher scale, where dozens of commits with hundreds of files may be happening 
> concurrently in the application.
> I believe this may be caused from the way the pendingSet files are stored in 
> the staging directory. They are stored under one directory named by the 
> jobID, in the Hadoop code. However, for all write jobs executed by the Spark 
> application, the jobID passed to Hadoop is the same - the application ID. 
> Maybe the staging commit algorithm was built on the assumption that each 
> instance of the algorithm would use a unique random jobID.
> [~ste...@apache.org] , [~rdblue] Having seen your names on most of this work 
> (thank you), I would be interested to know your thoughts on this. Also it's 
> my first time opening a bug here, so let me know if there's anything else I 
> can do to help report the issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31911) Using S3A staging committer, pending uploads are committed more than once and listed incorrectly in _SUCCESS data

2020-11-09 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228775#comment-17228775
 ] 

Steve Loughran commented on SPARK-31911:


yeah, its the uuid. see SPARK-33230 for the problem: if you do two stages in 
the same second, you get the conflict.

the writeJobUUID was something spark used to set...it's been restored. S3A 
committers in Hadoop will add extra resilience here HADOOP-17318.

Your fix is only partial. It separates the jobs in the shared cluster FS, but 
individual tasks still use the hadoop job+task attempt IDs to identify their 
task-attempt-specific work directory. If two task attempts on the same host are 
executed from jobs launched in the same second, then you still get conflict. 
That is fixed in HADOOP-17318



> Using S3A staging committer, pending uploads are committed more than once and 
> listed incorrectly in _SUCCESS data
> -
>
> Key: SPARK-31911
> URL: https://issues.apache.org/jira/browse/SPARK-31911
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.4
>Reporter: Brandon
>Priority: Major
>
> First of all thanks for the great work on the S3 committers. I was able set 
> up the directory staging committer in my environment following docs at 
> [https://github.com/apache/spark/blob/master/docs/cloud-integration.md#committing-work-into-cloud-storage-safely-and-fast]
>  and tested one of my Spark applications using it. The Spark version is 2.4.4 
> with Hadoop 3.2.1 and the cloud committer bindings. The application writes 
> multiple DataFrames to ORC/Parquet in S3, submitting them as write jobs to 
> Spark in parallel.
> I think I'm seeing a bug where the staging committer will complete pending 
> uploads more than once. The main symptom how I discovered this is that the 
> _SUCCESS data files under each table will contain overlapping file names that 
> belong to separate tables. From my reading of the code, that's because the 
> filenames in _SUCCESS reflect which multipart uploads were completed in the 
> commit for that particular table.
> An example:
> Concurrently, fire off DataFrame.write.orc("s3a://bucket/a") and 
> DataFrame.write.orc("s3a://bucket/b"). Suppose each table has one partition 
> so writes one partition file.
> When the two writes are done,
>  * /a/_SUCCESS contains two filenames: /a/part- and /b/part-.
>  * /b/_SUCCESS contains the same two filenames.
> Setting S3A logs to debug, I see the commitJob operation belonging to table a 
> includes completing the uploads of /a/part- and /b/part-. Then again, 
> commitJob for table b includes the same completions. I haven't had a problem 
> yet, but I wonder if having these extra requests would become an issue at 
> higher scale, where dozens of commits with hundreds of files may be happening 
> concurrently in the application.
> I believe this may be caused from the way the pendingSet files are stored in 
> the staging directory. They are stored under one directory named by the 
> jobID, in the Hadoop code. However, for all write jobs executed by the Spark 
> application, the jobID passed to Hadoop is the same - the application ID. 
> Maybe the staging commit algorithm was built on the assumption that each 
> instance of the algorithm would use a unique random jobID.
> [~ste...@apache.org] , [~rdblue] Having seen your names on most of this work 
> (thank you), I would be interested to know your thoughts on this. Also it's 
> my first time opening a bug here, so let me know if there's anything else I 
> can do to help report the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31911) Using S3A staging committer, pending uploads are committed more than once and listed incorrectly in _SUCCESS data

2020-06-10 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17130164#comment-17130164
 ] 

Brandon commented on SPARK-31911:
-

Some more notes:

My Spark app is configured with 
`spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2`. This seems 
relevant because the PendingSet files are managed with a wrapped 
FileOutputCommitter. Using v2 could exacerbate the problem, because the pending 
files are becoming visible to the driver immediately when a task finishes. Even 
so, if my understanding is correct, v1 would still have a race condition, but 
may present differently (not sure if worse or better).

An additional symptom of this bug is that data-writing tasks randomly fail with 
exceptions such as 
"org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File 
does not exist" while the task is committing its work. Of course, since spark 
attempts tasks 4 times by default, it usually gets through, but slows things 
down. This exception seems to be another symptom of the PendingSet files being 
cleaned up by the driver while tasks are still working on it?


{noformat}
20/06/10 04:45:36 WARN TaskSetManager: Lost task 7.0 in stage 15.0 (TID 4399, 
172.17.252.245, executor 1): org.apache.spark.SparkException: Task failed while 
writing rows.20/06/10 04:45:36 WARN TaskSetManager: Lost task 7.0 in stage 15.0 
(TID 4399, 172.17.252.245, executor 1): org.apache.spark.SparkException: Task 
failed while writing rows. at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:257)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:177)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.run(Task.scala:123) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)Caused by: 
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does 
not exist: 
/tmp/s3a-test-staging/www-data/hooray/staging-uploads/_temporary/0/_temporary/attempt_20200610044421_0015_m_07_4399
 (inode 64406205) Holder DFSClient_NONMAPREDUCE_330704236_69 does not have any 
open files. at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2800)
 at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:691)
 at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:677)
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2843)
 at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:928)
 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:607)
 at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) at 
org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872) at 
org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818) at 
java.security.AccessController.doPrivileged(Native Method) at 
javax.security.auth.Subject.doAs(Subject.java:422) at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)
 at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1545) at 
org.apache.hadoop.ipc.Client.call(Client.java:1491) at 
org.apache.hadoop.ipc.Client.call(Client.java:1388) at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
 at com.sun.proxy.$Proxy28.complete(Unknown Source) at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:557)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
 at 

[jira] [Commented] (SPARK-31911) Using S3A staging committer, pending uploads are committed more than once and listed incorrectly in _SUCCESS data

2020-06-05 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126936#comment-17126936
 ] 

Brandon commented on SPARK-31911:
-

Thanks for looking [~ste...@apache.org], I attached logs to the Hadoop ticket.

> Using S3A staging committer, pending uploads are committed more than once and 
> listed incorrectly in _SUCCESS data
> -
>
> Key: SPARK-31911
> URL: https://issues.apache.org/jira/browse/SPARK-31911
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.4
>Reporter: Brandon
>Priority: Major
>
> First of all thanks for the great work on the S3 committers. I was able set 
> up the directory staging committer in my environment following docs at 
> [https://github.com/apache/spark/blob/master/docs/cloud-integration.md#committing-work-into-cloud-storage-safely-and-fast]
>  and tested one of my Spark applications using it. The Spark version is 2.4.4 
> with Hadoop 3.2.1 and the cloud committer bindings. The application writes 
> multiple DataFrames to ORC/Parquet in S3, submitting them as write jobs to 
> Spark in parallel.
> I think I'm seeing a bug where the staging committer will complete pending 
> uploads more than once. The main symptom how I discovered this is that the 
> _SUCCESS data files under each table will contain overlapping file names that 
> belong to separate tables. From my reading of the code, that's because the 
> filenames in _SUCCESS reflect which multipart uploads were completed in the 
> commit for that particular table.
> An example:
> Concurrently, fire off DataFrame.write.orc("s3a://bucket/a") and 
> DataFrame.write.orc("s3a://bucket/b"). Suppose each table has one partition 
> so writes one partition file.
> When the two writes are done,
>  * /a/_SUCCESS contains two filenames: /a/part- and /b/part-.
>  * /b/_SUCCESS contains the same two filenames.
> Setting S3A logs to debug, I see the commitJob operation belonging to table a 
> includes completing the uploads of /a/part- and /b/part-. Then again, 
> commitJob for table b includes the same completions. I haven't had a problem 
> yet, but I wonder if having these extra requests would become an issue at 
> higher scale, where dozens of commits with hundreds of files may be happening 
> concurrently in the application.
> I believe this may be caused from the way the pendingSet files are stored in 
> the staging directory. They are stored under one directory named by the 
> jobID, in the Hadoop code. However, for all write jobs executed by the Spark 
> application, the jobID passed to Hadoop is the same - the application ID. 
> Maybe the staging commit algorithm was built on the assumption that each 
> instance of the algorithm would use a unique random jobID.
> [~ste...@apache.org] , [~rdblue] Having seen your names on most of this work 
> (thank you), I would be interested to know your thoughts on this. Also it's 
> my first time opening a bug here, so let me know if there's anything else I 
> can do to help report the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31911) Using S3A staging committer, pending uploads are committed more than once and listed incorrectly in _SUCCESS data

2020-06-05 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126609#comment-17126609
 ] 

Steve Loughran commented on SPARK-31911:


FYI [~mackrorysd]

> Using S3A staging committer, pending uploads are committed more than once and 
> listed incorrectly in _SUCCESS data
> -
>
> Key: SPARK-31911
> URL: https://issues.apache.org/jira/browse/SPARK-31911
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.4
>Reporter: Brandon
>Priority: Major
>
> First of all thanks for the great work on the S3 committers. I was able set 
> up the directory staging committer in my environment following docs at 
> [https://github.com/apache/spark/blob/master/docs/cloud-integration.md#committing-work-into-cloud-storage-safely-and-fast]
>  and tested one of my Spark applications using it. The Spark version is 2.4.4 
> with Hadoop 3.2.1 and the cloud committer bindings. The application writes 
> multiple DataFrames to ORC/Parquet in S3, submitting them as write jobs to 
> Spark in parallel.
> I think I'm seeing a bug where the staging committer will complete pending 
> uploads more than once. The main symptom how I discovered this is that the 
> _SUCCESS data files under each table will contain overlapping file names that 
> belong to separate tables. From my reading of the code, that's because the 
> filenames in _SUCCESS reflect which multipart uploads were completed in the 
> commit for that particular table.
> An example:
> Concurrently, fire off DataFrame.write.orc("s3a://bucket/a") and 
> DataFrame.write.orc("s3a://bucket/b"). Suppose each table has one partition 
> so writes one partition file.
> When the two writes are done,
>  * /a/_SUCCESS contains two filenames: /a/part- and /b/part-.
>  * /b/_SUCCESS contains the same two filenames.
> Setting S3A logs to debug, I see the commitJob operation belonging to table a 
> includes completing the uploads of /a/part- and /b/part-. Then again, 
> commitJob for table b includes the same completions. I haven't had a problem 
> yet, but I wonder if having these extra requests would become an issue at 
> higher scale, where dozens of commits with hundreds of files may be happening 
> concurrently in the application.
> I believe this may be caused from the way the pendingSet files are stored in 
> the staging directory. They are stored under one directory named by the 
> jobID, in the Hadoop code. However, for all write jobs executed by the Spark 
> application, the jobID passed to Hadoop is the same - the application ID. 
> Maybe the staging commit algorithm was built on the assumption that each 
> instance of the algorithm would use a unique random jobID.
> [~ste...@apache.org] , [~rdblue] Having seen your names on most of this work 
> (thank you), I would be interested to know your thoughts on this. Also it's 
> my first time opening a bug here, so let me know if there's anything else I 
> can do to help report the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31911) Using S3A staging committer, pending uploads are committed more than once and listed incorrectly in _SUCCESS data

2020-06-05 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126608#comment-17126608
 ] 

Steve Loughran commented on SPARK-31911:


BTW, given we complete the multipart uploads only once, I don't think we could 
actually write the files twice.

But: they could be committed by the wrong job, or aborted by the wrong job. And 
I'm surprised the second job commit didn't actually fail

> Using S3A staging committer, pending uploads are committed more than once and 
> listed incorrectly in _SUCCESS data
> -
>
> Key: SPARK-31911
> URL: https://issues.apache.org/jira/browse/SPARK-31911
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.4
>Reporter: Brandon
>Priority: Major
>
> First of all thanks for the great work on the S3 committers. I was able set 
> up the directory staging committer in my environment following docs at 
> [https://github.com/apache/spark/blob/master/docs/cloud-integration.md#committing-work-into-cloud-storage-safely-and-fast]
>  and tested one of my Spark applications using it. The Spark version is 2.4.4 
> with Hadoop 3.2.1 and the cloud committer bindings. The application writes 
> multiple DataFrames to ORC/Parquet in S3, submitting them as write jobs to 
> Spark in parallel.
> I think I'm seeing a bug where the staging committer will complete pending 
> uploads more than once. The main symptom how I discovered this is that the 
> _SUCCESS data files under each table will contain overlapping file names that 
> belong to separate tables. From my reading of the code, that's because the 
> filenames in _SUCCESS reflect which multipart uploads were completed in the 
> commit for that particular table.
> An example:
> Concurrently, fire off DataFrame.write.orc("s3a://bucket/a") and 
> DataFrame.write.orc("s3a://bucket/b"). Suppose each table has one partition 
> so writes one partition file.
> When the two writes are done,
>  * /a/_SUCCESS contains two filenames: /a/part- and /b/part-.
>  * /b/_SUCCESS contains the same two filenames.
> Setting S3A logs to debug, I see the commitJob operation belonging to table a 
> includes completing the uploads of /a/part- and /b/part-. Then again, 
> commitJob for table b includes the same completions. I haven't had a problem 
> yet, but I wonder if having these extra requests would become an issue at 
> higher scale, where dozens of commits with hundreds of files may be happening 
> concurrently in the application.
> I believe this may be caused from the way the pendingSet files are stored in 
> the staging directory. They are stored under one directory named by the 
> jobID, in the Hadoop code. However, for all write jobs executed by the Spark 
> application, the jobID passed to Hadoop is the same - the application ID. 
> Maybe the staging commit algorithm was built on the assumption that each 
> instance of the algorithm would use a unique random jobID.
> [~ste...@apache.org] , [~rdblue] Having seen your names on most of this work 
> (thank you), I would be interested to know your thoughts on this. Also it's 
> my first time opening a bug here, so let me know if there's anything else I 
> can do to help report the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31911) Using S3A staging committer, pending uploads are committed more than once and listed incorrectly in _SUCCESS data

2020-06-05 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126606#comment-17126606
 ] 

Steve Loughran commented on SPARK-31911:


thanks for this. Created HADOOP-17066

I'd like to see those debug logs; if you can, could you attach to that JIRA, or 
if not, email me as stevel at apache.org

At a guess > 1 job is 
- either using the same staging dir, so uploading twice
- or the directory under user.home we use for propagating those .pendingset 
files are the same

either way, serious bug. Will look at ASAP

> Using S3A staging committer, pending uploads are committed more than once and 
> listed incorrectly in _SUCCESS data
> -
>
> Key: SPARK-31911
> URL: https://issues.apache.org/jira/browse/SPARK-31911
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.4
>Reporter: Brandon
>Priority: Major
>
> First of all thanks for the great work on the S3 committers. I was able set 
> up the directory staging committer in my environment following docs at 
> [https://github.com/apache/spark/blob/master/docs/cloud-integration.md#committing-work-into-cloud-storage-safely-and-fast]
>  and tested one of my Spark applications using it. The Spark version is 2.4.4 
> with Hadoop 3.2.1 and the cloud committer bindings. The application writes 
> multiple DataFrames to ORC/Parquet in S3, submitting them as write jobs to 
> Spark in parallel.
> I think I'm seeing a bug where the staging committer will complete pending 
> uploads more than once. The main symptom how I discovered this is that the 
> _SUCCESS data files under each table will contain overlapping file names that 
> belong to separate tables. From my reading of the code, that's because the 
> filenames in _SUCCESS reflect which multipart uploads were completed in the 
> commit for that particular table.
> An example:
> Concurrently, fire off DataFrame.write.orc("s3a://bucket/a") and 
> DataFrame.write.orc("s3a://bucket/b"). Suppose each table has one partition 
> so writes one partition file.
> When the two writes are done,
>  * /a/_SUCCESS contains two filenames: /a/part- and /b/part-.
>  * /b/_SUCCESS contains the same two filenames.
> Setting S3A logs to debug, I see the commitJob operation belonging to table a 
> includes completing the uploads of /a/part- and /b/part-. Then again, 
> commitJob for table b includes the same completions. I haven't had a problem 
> yet, but I wonder if having these extra requests would become an issue at 
> higher scale, where dozens of commits with hundreds of files may be happening 
> concurrently in the application.
> I believe this may be caused from the way the pendingSet files are stored in 
> the staging directory. They are stored under one directory named by the 
> jobID, in the Hadoop code. However, for all write jobs executed by the Spark 
> application, the jobID passed to Hadoop is the same - the application ID. 
> Maybe the staging commit algorithm was built on the assumption that each 
> instance of the algorithm would use a unique random jobID.
> [~ste...@apache.org] , [~rdblue] Having seen your names on most of this work 
> (thank you), I would be interested to know your thoughts on this. Also it's 
> my first time opening a bug here, so let me know if there's anything else I 
> can do to help report the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31911) Using S3A staging committer, pending uploads are committed more than once and listed incorrectly in _SUCCESS data

2020-06-05 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126556#comment-17126556
 ] 

Brandon commented on SPARK-31911:
-

Interesting, it looks like the staging committer supports a configuration 
parameter `spark.sql.sources.writeJobUUID` that takes precedence over the 
`spark.app.id` for determining the name of the pendingSet directory. This is 
very interesting because `spark.sql.sources.writeJobUUID` is not present 
anywhere in the Spark codebase. Should this be set to a random UUID for each 
write job in Spark?

> Using S3A staging committer, pending uploads are committed more than once and 
> listed incorrectly in _SUCCESS data
> -
>
> Key: SPARK-31911
> URL: https://issues.apache.org/jira/browse/SPARK-31911
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.4
>Reporter: Brandon
>Priority: Major
>
> First of all thanks for the great work on the S3 committers. I was able set 
> up the directory staging committer in my environment following docs at 
> [https://github.com/apache/spark/blob/master/docs/cloud-integration.md#committing-work-into-cloud-storage-safely-and-fast]
>  and tested one of my Spark applications using it. The Spark version is 2.4.4 
> with Hadoop 3.2.1 and the cloud committer bindings. The application writes 
> multiple DataFrames to ORC/Parquet in S3, submitting them as write jobs to 
> Spark in parallel.
> I think I'm seeing a bug where the staging committer will complete pending 
> uploads more than once. The main symptom how I discovered this is that the 
> _SUCCESS data files under each table will contain overlapping file names that 
> belong to separate tables. From my reading of the code, that's because the 
> filenames in _SUCCESS reflect which multipart uploads were completed in the 
> commit for that particular table.
> An example:
> Concurrently, fire off DataFrame.write.orc("s3a://bucket/a") and 
> DataFrame.write.orc("s3a://bucket/b"). Suppose each table has one partition 
> so writes one partition file.
> When the two writes are done,
>  * /a/_SUCCESS contains two filenames: /a/part- and /b/part-.
>  * /b/_SUCCESS contains the same two filenames.
> Setting S3A logs to debug, I see the commitJob operation belonging to table a 
> includes completing the uploads of /a/part- and /b/part-. Then again, 
> commitJob for table b includes the same completions. I haven't had a problem 
> yet, but I wonder if having these extra requests would become an issue at 
> higher scale, where dozens of commits with hundreds of files may be happening 
> concurrently in the application.
> I believe this may be caused from the way the pendingSet files are stored in 
> the staging directory. They are stored under one directory named by the 
> jobID, in the Hadoop code. However, for all write jobs executed by the Spark 
> application, the jobID passed to Hadoop is the same - the application ID. 
> Maybe the staging commit algorithm was built on the assumption that each 
> instance of the algorithm would use a unique random jobID.
> [~ste...@apache.org] , [~rdblue] Having seen your names on most of this work 
> (thank you), I would be interested to know your thoughts on this. Also it's 
> my first time opening a bug here, so let me know if there's anything else I 
> can do to help report the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org