Hi,

May be the purpose of the article is different, but:
instead of:  sources (trail files) --> kafka --> flume --> write to cloud
storage -->> SSS
a much simpler solution is:  sources (trail files) -->  write to cloud
storage -->> SSS

Putting additional components and hops just does sound a bit difficult for
me to understand.

Regards,
Gourav

On Sat, Feb 26, 2022 at 5:12 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Besides, is the structure of your checkpoint as in this article of mine?
>
> Processing Change Data Capture with Spark Structured Streaming
> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=kPyvLBnnSjWPUR8vIq%2FAQw%3D%3D>
>
> Section on "The concept of checkpointing and its value with trigger once"
>
>
> see also
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
>
> You can try to clear the checkpoint directories content, run your spark
> job for a while and try to CTRL c etc to kill and check what are the
> entries under sources sub-directory
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 26 Feb 2022 at 10:44, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Check the thread I forwarded on how to gracefully shutdown spark
>> structured streaming
>>
>> HTH
>>
>> On Fri, 25 Feb 2022 at 22:31, karan alang <karan.al...@gmail.com> wrote:
>>
>>> Hello All,
>>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>>> data from Kafka, does some processing and puts processed data back into
>>> Kafka. The program was running fine, when I killed it (to make minor
>>> changes), and then re-started it.
>>>
>>> It is giving me the error -
>>> pyspark.sql.utils.StreamingQueryExceptionace: batch 44 doesn't exist
>>>
>>> Here is the error:
>>>
>>> 22/02/25 22:14:08 ERROR 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
>>> 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 
>>> 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>>> java.lang.IllegalStateException: batch 44 doesn't exist
>>>     at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>>     at scala.Option.getOrElse(Option.scala:189)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>>     at 
>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>>     at 
>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>>     at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>> Traceback (most recent call last):
>>>   File 
>>> "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>>>  line 609, in <module>
>>>     query.awaitTermination()
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", 
>>> line 101, in awaitTermination
>>>   File 
>>> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 
>>> 1304, in __call__
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
>>> 117, in deco
>>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>>
>>>
>>> Question - what is the cause of this error and how to debug/fix ? Also,
>>> I notice that the checkpoint location gets corrupted occasionally, when I
>>> do multiple restarts. After checkpoint corruption, it does not return any
>>> records
>>>
>>> For the above issue(as well as when the checkpoint was corrupted), when
>>> i cleared the checkpoint location and re-started the program, it went 
>>> trhough
>>> fine.
>>>
>>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>>
>>> Additional details are in stackoverflow :
>>>
>>>
>>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>>
>>> any input on this ?
>>>
>>> tia!
>>>
>>>
>>> --
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Reply via email to