[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579242#comment-16579242 ] bharath kumar avusherla commented on SPARK-23050: - [~ste...@apache.org], I can start working on it. > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma >Priority: Major > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109) > at >
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574276#comment-16574276 ] Steve Loughran commented on SPARK-23050: bq. Is there any way we can avoid happening this? With Amazon EMR, pay the premium for EMR [Consistent View|https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html]. For ASF Hadoop, plus CDH and HDP, [S3Guard|https://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/s3guard.html] Now, what's really needed is a checkpoint mechanism for Spark streaming which works with the S3 consistency (create consistency iff you don't do a HEAD/GET first); if you do want to get involved in this, then what I've promised in HADOOP-15460 might help, as it promises faster writes without any inconsistency-inducing HEAD call first. Being open source projects, this is somewhere where you have an opportunity to get involved > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma >Priority: Major > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at >
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16573700#comment-16573700 ] bharath kumar avusherla commented on SPARK-23050: - Is there any way we can avoid happening this? We also recently observed the same issue when reading from Kafka topic and storing the output to the S3 (and checkpointing in S3). And we are spark structured streaming 2.3.0. > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma >Priority: Major > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333809#comment-16333809 ] Yash Sharma commented on SPARK-23050: - Hi [~ste...@apache.org], Thanks for bringing this great discussion on the ticket. I will turn on the debug logs and provide more feedback soon. Till I get there, here are the details of the job: * Instance type: The instance are pretty good currently. R3.8x Large : 64vcpu, 244Gigs memory, but I have seen the issue with other smaller instances as well. * Data processed: Input: ~110Gigs, Output: 2 Gigs / per day on streaming. The data volume is usually uniform and there is no load on the machines. Output files written ~1600. * Lesser data: I suspect the issue will be less with smaller data but I need to confirm that. Currently the error is seen 5% of times on current data load, not all runs have the issue. The ratio of the number of files duplicated is also very low - 3/1600. I strongly think that lower volume would not reproduce the issue. > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma >Priority: Major > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at >
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333477#comment-16333477 ] Steve Loughran commented on SPARK-23050: there's one thing which worries me here: the implication that its repeatable. caching of -ve entries is generally rarely observed, at leat for me. Two possibilities spring to mind # the VM rented is fast enough that it really is outrunning S3. That's the WONTFIX # The PUT isn't completing before {{commitTask()}} is called. That could happen if the upload is being done asynchronously: either the executor's thread is still running when the task is committed (unlikely) or the upload to S3 is being done in a thread pool and the final POST to complete the write is still outstanding. That I can believe [~yash...@gmail.com], some questions & them some homework for you * What EC2 VM type is this? * How much data is being written? Homework * does it still happen on different VMs? * does it happen if you generate less data? * if you switch to the s3a connector and log org.apache.hadoop.fs.s3a at debug level, what does it say about completing the upload? and, assuming you set up log4j to log threads, which thread Finally, if you set up S3 on your target bucket to log HTTP requests, what is the ordering of the PUT/POST of the upload and the HEAD of the getFileStatus call? That can be examined even with EMR? > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma >Priority: Major > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory >
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327649#comment-16327649 ] Steve Loughran commented on SPARK-23050: {quote} Is there an API to detect S3 like file systems? {quote} Not really, no, because they are all so fairly different.. HADOOP-9565 documents different attempts in the past do to a cloud-store specific API with both capabilities and operations (PUT, LIST, ...). Problem there is that to do the interesting stuff you need to move off lowest-common-denominator, anything with some "Blobstore" interface to look for is limited and potentially too pessemistic (WASB and ADLS stronger consistency, etc) I'd expect in future to at least declare FS Semantics more, the way Hadoop IO stream's now have a {{StreamCapabilities.hasCapabilities(string)}} probe. HADOOP-14707 formalises exactly that for filesystems; and S3A in trunk implements {{StreamCapabilities}} already. For now, the one thing which will work everywhere is to have some option for checkpointing in place rather than write+rename w.r.t how best to handle this FNFE here, a busy-wait for a few seconds *should* fix s3 inconsistency, but doesn't handle the non-failure-condition of "no file written because there was no interesting data", which ORC can, I believe, raise. [~dongjoon] will be the expert there > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma >Priority: Major > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at >
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327632#comment-16327632 ] Shixiong Zhu commented on SPARK-23050: -- [~ste...@apache.org] Yeah, that's a good improvement for S3. Is there an API to detect S3 like file systems? > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma >Priority: Major > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at >
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326330#comment-16326330 ] Steve Loughran commented on SPARK-23050: Quick review of the code Yes, there's potentially a failure if a cached 404 is picked up in taskCommit. It'd be slightly less brittle to return the array of URIs in the task commit message, have {{commitJob}} call getFileStatus() for each. That'd eliminate the problem except for any task committed immediately before commitJob & whose ref was still in the negative cache of the S3 load balances. It would also help catch the potential issue "file is lost between task commit and job commit". Even so, it'd be safe to do some little retry a bit ike ScalaTest;s {{eventually()}} to deal with that negative caching. it *should* only be for a few seconds, at worst (we don't have any real figures on it, it's usually so rarely seen, at least with the s3a client). Following the commit Job code path, {{HDFSMetadataLog}} could be made object store aware, and opt for a direct (atomic) overwrite of the log, rather than the write to temp & rename. Without that, time to commit becomes O(files) rather than (1) > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma >Priority: Major > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at >
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324745#comment-16324745 ] Steve Loughran commented on SPARK-23050: this s3n is the amazon EMR closed source impl; nothing the ASF can handle. AWS S3 has create consistency: if you create a file which has never existed and to a HEAD or GET against it, you will see it, so it shouldn't be failiing in getFileStatus except in the special case of the s3 load balancers briefling caching 404 responses. A previous HEAD before the write may have caused the 404 to be cached Amazon offer a consistent view as a [premium option|https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html]. Not only should this offer list consistency, you are likely to get speedup on things like job partitioning, streaming scans of an s3 source bucket and similar. Disclaimer: I've not used it, have no evidence. the ASF s3a connector has something similar in Hadoop 3.1; S3Guard, again it uses DDB for consistency and performance. And HADOOP-13884 is the proposal to fix that negative cache problem. I've not looked @ the spark streaming commit protocol to see what it expects from its store. Clearly it expects the store to be "real" FS. If its just trying to enumerate stuff, not expect rename() to be atomic or similar, then some retry awaiting for an FNFE to go away could work. I'd need to look @ the code to see Meanwhile, for your entertainment, what it takes [to commit to S3|https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_draft_002]. > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324717#comment-16324717 ] Shixiong Zhu commented on SPARK-23050: -- [~yash...@gmail.com] Spark SQL should handle it. Yeah, unfortunately, external tools such as Presto, Hive don't understand the logs. "s3a" will not make any difference, and is not related to this issue. > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at >
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324711#comment-16324711 ] Yash Sharma commented on SPARK-23050: - [~marmbrus], [~shixi...@databricks.com] When we say read via spark, is it via the readStream API again ? What if there is a spark-sql/hive table sitting directly on top of the S3 location? We have an external table sitting on the output S3 location which is consumed by Presto, Hive, Spark-sql. They view the files directly hence we still see duplicated data :) . I can work on the new protocol if its already in roadmap to solve this problem. Meanwhile I would also try s3a as suggested by [~sowen] if it solves the problem in hand. > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at >
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323883#comment-16323883 ] Michael Armbrust commented on SPARK-23050: -- [~zsxwing] is correct. While it is possible for files to get written out more than once, the whole point of the log is to make sure that readers will only see exactly one copy. It is possible we will come up with a new streaming commit protocol as part of the migration to the V2 data source, but for now I would call this working as intended. > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323329#comment-16323329 ] Sean Owen commented on SPARK-23050: --- Also have you read Steve's documentation on how S3 works with Spark? I think you need to use s3a:, among other things. > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109) > at >
[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.
[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323320#comment-16323320 ] Shixiong Zhu commented on SPARK-23050: -- How do you read the output? If you use Spark to read the output, it will only read the successful files which are stored in the query metadata. > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > - > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yash Sharma > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:>(277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at >