[jira] [Commented] (SPARK-25257) v2 MicroBatchReaders can't resume from checkpoints

2018-08-31 Thread Shixiong Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599279#comment-16599279
 ] 

Shixiong Zhu commented on SPARK-25257:
--

[~mojodna] This issue has been fixed in SPARK-23092.

> v2 MicroBatchReaders can't resume from checkpoints
> --
>
> Key: SPARK-25257
> URL: https://issues.apache.org/jira/browse/SPARK-25257
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Seth Fitzsimmons
>Priority: Major
> Attachments: deserialize.patch
>
>
> When resuming from a checkpoint:
> {code:java}
> writeStream.option("checkpointLocation", 
> "/tmp/checkpoint").format("console").start
> {code}
> The stream reader fails with:
> {noformat}
> osmesa.common.streaming.AugmentedDiffMicroBatchReader@59e19287
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: java.lang.ClassCastException: 
> org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to 
> org.apache.spark.sql.sources.v2.reader.streaming.Offset
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>   at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at 
> org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>   ... 1 more
> {noformat}
> The root cause appears to be that the {{SerializedOffset}} (JSON, from disk) 
> is never deserialized; I would expect to see something along the lines of 
> {{reader.deserializeOffset(off.json)}} here (unless {{available}} is intended 
> to be deserialized elsewhere):

[jira] [Commented] (SPARK-25257) v2 MicroBatchReaders can't resume from checkpoints

2018-08-27 Thread Seth Fitzsimmons (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594298#comment-16594298
 ] 

Seth Fitzsimmons commented on SPARK-25257:
--

I traced this a bit further; {{OffsetSeq.toStreamProgress}} looks like it might 
be the right place to deserialize. {{deserialize.patch}} deserializes if the 
source is a {{MicroBatchReader}} (i.e. has a {{deserializeOffset}} method).

{{runBatch}} seems partially right (it's specific to {{MicroBatchReader}}s), 
but {{available}} (variable in scope) may be either a {{SerializedOffset}} or 
{{Offset}} depending whether a checkpoint is being resumed or if it's starting 
from a clean slate.

> v2 MicroBatchReaders can't resume from checkpoints
> --
>
> Key: SPARK-25257
> URL: https://issues.apache.org/jira/browse/SPARK-25257
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Seth Fitzsimmons
>Priority: Major
> Attachments: deserialize.patch
>
>
> When resuming from a checkpoint:
> {code:java}
> writeStream.option("checkpointLocation", 
> "/tmp/checkpoint").format("console").start
> {code}
> The stream reader fails with:
> {noformat}
> osmesa.common.streaming.AugmentedDiffMicroBatchReader@59e19287
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: java.lang.ClassCastException: 
> org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to 
> org.apache.spark.sql.sources.v2.reader.streaming.Offset
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>   at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at 
> org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>   at 
>