Hi,
While exploring checkpointing with kafka source and console sink I've
got the exception:
// today's build from the master
scala> spark.version
res8: String = 2.3.0-SNAPSHOT
scala> val q = records.
| writeStream.
| format("console").
| option("truncate", false).
| option("checkpointLocation", "/tmp/checkpoint"). // <--
checkpoint directory
| trigger(Trigger.ProcessingTime(10.seconds)).
| outputMode(OutputMode.Update).
| start
org.apache.spark.sql.AnalysisException: This query does not support
recovering from checkpoint location. Delete /tmp/checkpoint/offsets to
start over.;
at
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222)
at
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284)
... 61 elided
The "trigger" is the change
https://issues.apache.org/jira/browse/SPARK-16116 and this line in
particular
https://github.com/apache/spark/pull/13817/files#diff-d35e8fce09686073f81de598ed657de7R277.
Why is this needed? I can't think of a use case where console sink
could not recover from checkpoint location (since all the information
is available). I'm lost on it and would appreciate some help (to
recover :))
Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]