Hi, May be the purpose of the article is different, but: instead of: sources (trail files) --> kafka --> flume --> write to cloud storage -->> SSS a much simpler solution is: sources (trail files) --> write to cloud storage -->> SSS
Putting additional components and hops just does sound a bit difficult for me to understand. Regards, Gourav On Sat, Feb 26, 2022 at 5:12 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Besides, is the structure of your checkpoint as in this article of mine? > > Processing Change Data Capture with Spark Structured Streaming > <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=kPyvLBnnSjWPUR8vIq%2FAQw%3D%3D> > > Section on "The concept of checkpointing and its value with trigger once" > > > see also > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing > > > You can try to clear the checkpoint directories content, run your spark > job for a while and try to CTRL c etc to kill and check what are the > entries under sources sub-directory > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 26 Feb 2022 at 10:44, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Check the thread I forwarded on how to gracefully shutdown spark >> structured streaming >> >> HTH >> >> On Fri, 25 Feb 2022 at 22:31, karan alang <karan.al...@gmail.com> wrote: >> >>> Hello All, >>> I'm running a StructuredStreaming program on GCP Dataproc, which reads >>> data from Kafka, does some processing and puts processed data back into >>> Kafka. The program was running fine, when I killed it (to make minor >>> changes), and then re-started it. >>> >>> It is giving me the error - >>> pyspark.sql.utils.StreamingQueryExceptionace: batch 44 doesn't exist >>> >>> Here is the error: >>> >>> 22/02/25 22:14:08 ERROR >>> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = >>> 0b73937f-1140-40fe-b201-cf4b7443b599, runId = >>> 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error >>> java.lang.IllegalStateException: batch 44 doesn't exist >>> at >>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286) >>> at scala.Option.getOrElse(Option.scala:189) >>> at >>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286) >>> at >>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197) >>> at >>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) >>> at >>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) >>> at >>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) >>> at >>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) >>> at >>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194) >>> at >>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) >>> at >>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188) >>> at >>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334) >>> at >>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) >>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) >>> at >>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317) >>> at >>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) >>> Traceback (most recent call last): >>> File >>> "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", >>> line 609, in <module> >>> query.awaitTermination() >>> File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", >>> line 101, in awaitTermination >>> File >>> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line >>> 1304, in __call__ >>> File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line >>> 117, in deco >>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist >>> >>> >>> Question - what is the cause of this error and how to debug/fix ? Also, >>> I notice that the checkpoint location gets corrupted occasionally, when I >>> do multiple restarts. After checkpoint corruption, it does not return any >>> records >>> >>> For the above issue(as well as when the checkpoint was corrupted), when >>> i cleared the checkpoint location and re-started the program, it went >>> trhough >>> fine. >>> >>> Pls note: while doing readStream, i've enabled failOnDataLoss=false >>> >>> Additional details are in stackoverflow : >>> >>> >>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44 >>> >>> any input on this ? >>> >>> tia! >>> >>> >>> -- >> >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >