gaborgsomogyi commented on issue #24932: [MINOR][SQL][DOCS] failOnDataLoss has 
effect on batch queries so fix the doc
URL: https://github.com/apache/spark/pull/24932#issuecomment-504376180
 
 
   Just for the sake of understanding the following exception comes if 
`failOnDataLoss` is set to `true` in the mentioned test:
   ```
   Job aborted.
   org.apache.spark.SparkException: Job aborted.
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:172)
        at 
org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:522)
        at 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:219)
        at 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:177)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:123)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:168)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:192)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:164)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:105)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:105)
        at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:718)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:718)
        at 
org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:515)
        at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:493)
        at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:449)
        at 
org.apache.spark.sql.kafka010.KafkaDontFailOnDataLossSuite.$anonfun$new$10(KafkaDontFailOnDataLossSuite.scala:181)
        at 
org.apache.spark.sql.kafka010.KafkaDontFailOnDataLossSuite.$anonfun$new$10$adapted(KafkaDontFailOnDataLossSuite.scala:180)
        at 
org.apache.spark.sql.kafka010.KafkaDontFailOnDataLossSuite.$anonfun$verifyMissingOffsetsDontCauseDuplicatedRecords$3(KafkaDontFailOnDataLossSuite.scala:126)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:289)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:287)
        at 
org.apache.spark.sql.kafka010.KafkaDontFailOnDataLossSuite.withTable(KafkaDontFailOnDataLossSuite.scala:84)
        at 
org.apache.spark.sql.kafka010.KafkaDontFailOnDataLossSuite.verifyMissingOffsetsDontCauseDuplicatedRecords(KafkaDontFailOnDataLossSuite.scala:109)
        at 
org.apache.spark.sql.kafka010.KafkaDontFailOnDataLossSuite.$anonfun$new$9(KafkaDontFailOnDataLossSuite.scala:180)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
        at 
org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
        at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
        at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
        at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
        at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
        at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
        at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
        at 
org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
        at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
        at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
        at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
        at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
        at org.scalatest.Suite.run(Suite.scala:1147)
        at org.scalatest.Suite.run$(Suite.scala:1129)
        at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
        at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
        at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
        at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
        at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
        at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
        at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
        at 
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1346)
        at 
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1340)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
        at 
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
        at 
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
        at 
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
        at 
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
        at org.scalatest.tools.Runner$.run(Runner.scala:850)
        at org.scalatest.tools.Runner.run(Runner.scala)
        at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
        at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 0.0 failed 3 times, most recent failure: Lost task 0.2 
in stage 0.0 (TID 2, localhost, executor driver): 
org.apache.spark.SparkException: Task failed while writing rows.
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:285)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:126)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.IllegalStateException: Cannot fetch offset 0 (GroupId: 
spark-kafka-relation-5d38244d-c486-4869-acb8-83efddcbb614-executor, 
TopicPartition: failOnDataLoss-0-0). 
   Some data may have been lost because they are not available in Kafka any 
more; either the
    data was aged out by Kafka or the topic may have been deleted before all 
the data in the
    topic was processed. If you don't want your streaming query to fail on such 
cases, set the
    source option "failOnDataLoss" to "false".
       
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$reportDataLoss0(KafkaDataConsumer.scala:646)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.reportDataLoss(KafkaDataConsumer.scala:452)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.$anonfun$get$1(KafkaDataConsumer.scala:273)
        at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:213)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:238)
        at 
org.apache.spark.sql.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:65)
        at 
org.apache.spark.sql.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:60)
        at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:510)
        at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:113)
        at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:104)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:701)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:701)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1384)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:270)
        ... 9 more
   Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
Offsets out of range with no configured reset policy for partitions: 
{failOnDataLoss-0-0=0}
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:990)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:491)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:474)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchRecord(KafkaDataConsumer.scala:365)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.$anonfun$get$1(KafkaDataConsumer.scala:255)
        ... 31 more
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1952)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1940)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1939)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1939)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:943)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:943)
        at scala.Option.foreach(Option.scala:274)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:943)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2169)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2118)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2107)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:745)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
        ... 83 more
   Caused by: org.apache.spark.SparkException: Task failed while writing rows.
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:285)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:126)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.IllegalStateException: Cannot fetch offset 0 (GroupId: 
spark-kafka-relation-5d38244d-c486-4869-acb8-83efddcbb614-executor, 
TopicPartition: failOnDataLoss-0-0). 
   Some data may have been lost because they are not available in Kafka any 
more; either the
    data was aged out by Kafka or the topic may have been deleted before all 
the data in the
    topic was processed. If you don't want your streaming query to fail on such 
cases, set the
    source option "failOnDataLoss" to "false".
       
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$reportDataLoss0(KafkaDataConsumer.scala:646)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.reportDataLoss(KafkaDataConsumer.scala:452)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.$anonfun$get$1(KafkaDataConsumer.scala:273)
        at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:213)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:238)
        at 
org.apache.spark.sql.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:65)
        at 
org.apache.spark.sql.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:60)
        at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:510)
        at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:113)
        at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:104)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:701)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:701)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1384)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:270)
        ... 9 more
   Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
Offsets out of range with no configured reset policy for partitions: 
{failOnDataLoss-0-0=0}
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:990)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:491)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:474)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchRecord(KafkaDataConsumer.scala:365)
        at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.$anonfun$get$1(KafkaDataConsumer.scala:255)
        ... 31 more
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to