Re: Graceful shutdown SPARK Structured Streaming

2023-02-20 Thread Mich Talebzadeh
in json format with the following structure
>>>>>>
>>>>>>
>>>>>> root
>>>>>>
>>>>>>  |-- newtopic_value: struct (nullable = true)
>>>>>>
>>>>>>  ||-- uuid: string (nullable = true)
>>>>>>
>>>>>>  ||-- timeissued: timestamp (nullable = true)
>>>>>>
>>>>>>  ||-- queue: string (nullable = true)
>>>>>>
>>>>>>  ||-- status: string (nullable = true)
>>>>>>
>>>>>> In above the queue refers to the business topic) and status is set to
>>>>>> 'true', meaning carry on processing the business stream. This control 
>>>>>> topic
>>>>>> streaming  can be restarted anytime, and status can be set to false if we
>>>>>> want to stop the streaming queue for a given business topic
>>>>>>
>>>>>> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
>>>>>> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
>>>>>> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}
>>>>>>
>>>>>> 64a8321c-1593-428b-ae65-89e45ddf0640
>>>>>> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
>>>>>> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}
>>>>>>
>>>>>> So how can I stop the business queue when the current business topic
>>>>>> message has been processed? Let us say the source is sending data for a
>>>>>> business topic every 30 seconds. Our control topic sends a one liner as
>>>>>> above every 2 seconds.
>>>>>>
>>>>>> In your writestream add the following line to be able to identify
>>>>>> topic name
>>>>>>
>>>>>> trigger(processingTime='30 seconds'). \
>>>>>> *queryName('md'). *\
>>>>>>
>>>>>> Next the controlling topic (called newtopic)  has the following
>>>>>>
>>>>>> foreachBatch(*sendToControl*). \
>>>>>> trigger(processingTime='2 seconds'). \
>>>>>> queryName('newtopic'). \
>>>>>>
>>>>>> That method sendToControl does what is needed
>>>>>>
>>>>>> def sendToControl(dfnewtopic, batchId):
>>>>>> if(len(dfnewtopic.take(1))) > 0:
>>>>>> #print(f"""newtopic batchId is {batchId}""")
>>>>>> #dfnewtopic.show(10,False)
>>>>>> queue = dfnewtopic.select(col("queue")).collect()[0][0]
>>>>>> status = dfnewtopic.select(col("status")).collect()[0][0]
>>>>>>
>>>>>> if((queue == 'md')) & (status == 'false')):
>>>>>>   spark_session = s.spark_session(config['common']['appName'])
>>>>>>   active = spark_session.streams.active
>>>>>>   for e in active:
>>>>>>  #print(e)
>>>>>>  name = e.name
>>>>>>  if(name == 'md'):
>>>>>> print(f"""Terminating streaming process {name}""")
>>>>>> e.stop()
>>>>>> else:
>>>>>> print("DataFrame newtopic is empty")
>>>>>>
>>>>>> This seems to work as I checked it to ensure that in this case data
>>>>>> was written and saved to the target sink (BigQuery table). It will wait
>>>>>> until data is written completely meaning the current streaming message is
>>>>>> processed and there is a latency there (meaning waiting for graceful
>>>>>> completion)
>>>>>>
>>>>>> This is the output
>>>>>>
>>>>>> Terminating streaming process md
>>>>>> wrote to DB  ## this is the flag  I added to ensure the current
>>>>>> micro-bath was completed
>>>>>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md
>>>>>> [id = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
>>>>>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>>>>>>
>>>>>> The various termination processes are described in
>>>>>>
>>>>>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
>>>>>> (apache.org)
>>>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>>>>>>
>>>>>> This is the idea I came up with which allows ending the streaming
>>>>>> process with least cost.
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>>view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
>>>>>> gourav.sengupta.develo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> just thought of reaching out once again and seeking out your kind
>>>>>>> help to find out what is the best way to stop SPARK streaming 
>>>>>>> gracefully.
>>>>>>> Do we still use the methods of creating a file as in SPARK 2.4.x which 
>>>>>>> is
>>>>>>> several years old method or do we have a better approach in SPARK 3.1?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Gourav Sengupta
>>>>>>>
>>>>>>> -- Forwarded message -
>>>>>>> From: Gourav Sengupta 
>>>>>>> Date: Wed, Apr 21, 2021 at 10:06 AM
>>>>>>> Subject: Graceful shutdown SPARK Structured Streaming
>>>>>>> To: 
>>>>>>>
>>>>>>>
>>>>>>> Dear friends,
>>>>>>>
>>>>>>> is there any documentation available for gracefully stopping SPARK
>>>>>>> Structured Streaming in 3.1.x?
>>>>>>>
>>>>>>> I am referring to articles which are 4 to 5 years old and was
>>>>>>> wondering whether there is a better way available today to gracefully
>>>>>>> shutdown a SPARK streaming job.
>>>>>>>
>>>>>>> Thanks a ton in advance for all your kind help.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Gourav Sengupta
>>>>>>>
>>>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: Graceful shutdown SPARK Structured Streaming

2023-02-19 Thread Bjørn Jørgensen
y 2 seconds.
>>>>>
>>>>> In your writestream add the following line to be able to identify
>>>>> topic name
>>>>>
>>>>> trigger(processingTime='30 seconds'). \
>>>>> *queryName('md'). *\
>>>>>
>>>>> Next the controlling topic (called newtopic)  has the following
>>>>>
>>>>> foreachBatch(*sendToControl*). \
>>>>> trigger(processingTime='2 seconds'). \
>>>>> queryName('newtopic'). \
>>>>>
>>>>> That method sendToControl does what is needed
>>>>>
>>>>> def sendToControl(dfnewtopic, batchId):
>>>>> if(len(dfnewtopic.take(1))) > 0:
>>>>> #print(f"""newtopic batchId is {batchId}""")
>>>>> #dfnewtopic.show(10,False)
>>>>> queue = dfnewtopic.select(col("queue")).collect()[0][0]
>>>>> status = dfnewtopic.select(col("status")).collect()[0][0]
>>>>>
>>>>> if((queue == 'md')) & (status == 'false')):
>>>>>   spark_session = s.spark_session(config['common']['appName'])
>>>>>   active = spark_session.streams.active
>>>>>   for e in active:
>>>>>  #print(e)
>>>>>  name = e.name
>>>>>  if(name == 'md'):
>>>>> print(f"""Terminating streaming process {name}""")
>>>>> e.stop()
>>>>> else:
>>>>> print("DataFrame newtopic is empty")
>>>>>
>>>>> This seems to work as I checked it to ensure that in this case data
>>>>> was written and saved to the target sink (BigQuery table). It will wait
>>>>> until data is written completely meaning the current streaming message is
>>>>> processed and there is a latency there (meaning waiting for graceful
>>>>> completion)
>>>>>
>>>>> This is the output
>>>>>
>>>>> Terminating streaming process md
>>>>> wrote to DB  ## this is the flag  I added to ensure the current
>>>>> micro-bath was completed
>>>>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md
>>>>> [id = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
>>>>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>>>>>
>>>>> The various termination processes are described in
>>>>>
>>>>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
>>>>> (apache.org)
>>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>>>>>
>>>>> This is the idea I came up with which allows ending the streaming
>>>>> process with least cost.
>>>>>
>>>>> HTH
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
>>>>> gourav.sengupta.develo...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> just thought of reaching out once again and seeking out your kind
>>>>>> help to find out what is the best way to stop SPARK streaming gracefully.
>>>>>> Do we still use the methods of creating a file as in SPARK 2.4.x which is
>>>>>> several years old method or do we have a better approach in SPARK 3.1?
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>> -- Forwarded message -
>>>>>> From: Gourav Sengupta 
>>>>>> Date: Wed, Apr 21, 2021 at 10:06 AM
>>>>>> Subject: Graceful shutdown SPARK Structured Streaming
>>>>>> To: 
>>>>>>
>>>>>>
>>>>>> Dear friends,
>>>>>>
>>>>>> is there any documentation available for gracefully stopping SPARK
>>>>>> Structured Streaming in 3.1.x?
>>>>>>
>>>>>> I am referring to articles which are 4 to 5 years old and was
>>>>>> wondering whether there is a better way available today to gracefully
>>>>>> shutdown a SPARK streaming job.
>>>>>>
>>>>>> Thanks a ton in advance for all your kind help.
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Graceful shutdown SPARK Structured Streaming

2023-02-08 Thread Brian Wylie
t;>>> if((queue == 'md')) & (status == 'false')):
>>>>   spark_session = s.spark_session(config['common']['appName'])
>>>>   active = spark_session.streams.active
>>>>   for e in active:
>>>>  #print(e)
>>>>  name = e.name
>>>>  if(name == 'md'):
>>>> print(f"""Terminating streaming process {name}""")
>>>> e.stop()
>>>> else:
>>>> print("DataFrame newtopic is empty")
>>>>
>>>> This seems to work as I checked it to ensure that in this case data was
>>>> written and saved to the target sink (BigQuery table). It will wait until
>>>> data is written completely meaning the current streaming message is
>>>> processed and there is a latency there (meaning waiting for graceful
>>>> completion)
>>>>
>>>> This is the output
>>>>
>>>> Terminating streaming process md
>>>> wrote to DB  ## this is the flag  I added to ensure the current
>>>> micro-bath was completed
>>>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md
>>>> [id = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
>>>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>>>>
>>>> The various termination processes are described in
>>>>
>>>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
>>>> (apache.org)
>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>>>>
>>>> This is the idea I came up with which allows ending the streaming
>>>> process with least cost.
>>>>
>>>> HTH
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
>>>> gourav.sengupta.develo...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> just thought of reaching out once again and seeking out your kind help
>>>>> to find out what is the best way to stop SPARK streaming gracefully. Do we
>>>>> still use the methods of creating a file as in SPARK 2.4.x which is 
>>>>> several
>>>>> years old method or do we have a better approach in SPARK 3.1?
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>> -- Forwarded message -
>>>>> From: Gourav Sengupta 
>>>>> Date: Wed, Apr 21, 2021 at 10:06 AM
>>>>> Subject: Graceful shutdown SPARK Structured Streaming
>>>>> To: 
>>>>>
>>>>>
>>>>> Dear friends,
>>>>>
>>>>> is there any documentation available for gracefully stopping SPARK
>>>>> Structured Streaming in 3.1.x?
>>>>>
>>>>> I am referring to articles which are 4 to 5 years old and was
>>>>> wondering whether there is a better way available today to gracefully
>>>>> shutdown a SPARK streaming job.
>>>>>
>>>>> Thanks a ton in advance for all your kind help.
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>> --
>> Best Regards,
>> Ayan Guha
>>
>


Fwd: Graceful shutdown SPARK Structured Streaming

2023-02-07 Thread Mich Talebzadeh
-- Forwarded message -
From: Mich Talebzadeh 
Date: Thu, 6 May 2021 at 20:07
Subject: Re: Graceful shutdown SPARK Structured Streaming
To: ayan guha 
Cc: Gourav Sengupta , user @spark <
user@spark.apache.org>


That is a valid question and I am not aware of any new addition to Spark
Structured Streaming (SSS) in newer releases for this graceful shutdown.

Going back to my earlier explanation, there are occasions that you may want
to stop the Spark program gracefully. Gracefully meaning that Spark
application handles the last streaming message completely and terminates
the application. This is different from invoking interrupts such as CTRL-C.
Of course one can terminate the process based on the following


   1.

   query.awaitTermination() # Waits for the termination of this query, with
   stop() or with error
   2.

   query.awaitTermination(timeoutMs) # Returns true if this query is
   terminated within the timeout in milliseconds.

So the first one above waits until an interrupt signal is received. The
second one will count the timeout and will exit when timeout in
milliseconds is reached

The issue is that one needs to predict how long the streaming job needs to
run. Clearly any interrupt at the terminal or OS level (kill process), may
end up the processing terminated without a proper completion of the
streaming process.
So I gather if we agree on what constitutes a graceful shutdown we can
consider both the tool offerings from Spark itself  or what solutions we
can come up with.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 6 May 2021 at 13:28, ayan guha  wrote:

> What are some other "newer" methodologies?
>
> Really interested to understand what is possible here as this is a topic
> came up in this forum time and again.
>
> On Thu, 6 May 2021 at 5:13 pm, Gourav Sengupta <
> gourav.sengupta.develo...@gmail.com> wrote:
>
>> Hi Mich,
>>
>> thanks a ton for your kind response, looks like we are still using the
>> earlier methodologies for stopping a spark streaming program gracefully.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Wed, May 5, 2021 at 6:04 PM Mich Talebzadeh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>>
>>> I believe I discussed this in this forum. I sent the following to
>>> spark-dev forum as an add-on to Spark functionality. This is the gist of
>>> it.
>>>
>>>
>>> Spark Structured Streaming AKA SSS is a very useful tool in dealing with
>>> Event Driven Architecture. In an Event Driven Architecture, there is
>>> generally a main loop that listens for events and then triggers a call-back
>>> function when one of those events is detected. In a streaming application
>>> the application waits to receive the source messages in a set interval or
>>> whenever they happen and reacts accordingly.
>>>
>>> There are occasions that you may want to stop the Spark program
>>> gracefully. Gracefully meaning that Spark application handles the last
>>> streaming message completely and terminates the application. This is
>>> different from invoking interrupts such as CTRL-C. Of course one can
>>> terminate the process based on the following
>>>
>>>
>>>1.
>>>
>>>query.awaitTermination() # Waits for the termination of this query,
>>>with stop() or with error
>>>2.
>>>
>>>query.awaitTermination(timeoutMs) # Returns true if this query is
>>>terminated within the timeout in milliseconds.
>>>
>>> So the first one above waits until an interrupt signal is received. The
>>> second one will count the timeout and will exit when timeout in
>>> milliseconds is reached
>>>
>>> The issue is that one needs to predict how long the streaming job needs
>>> to run. Clearly any interrupt at the terminal or OS level (kill process),
>>> may end up the processing terminated without a proper completion of the
>>> streaming process.
>>>
>>> I have devised a method that allows one to terminate the spark
>>> application internally after processing the last received message. Within
>>> say 2 seconds of the confirmation of shutdown, the process will invoke
>>>
>>> How to

Re: Graceful shutdown SPARK Structured Streaming

2021-05-06 Thread Mich Talebzadeh
ing sensors to
>>> implement this like airflow would be expensive as for example reading a
>>> file from object storage or from an underlying database would have incurred
>>> additional I/O overheads through continuous polling.
>>>
>>>
>>> So the design had to be incorporated into the streaming process itself.
>>> What I came up with was an addition of a control topic (I call it newtopic
>>> below), which keeps running triggered every 2 seconds say and is in json
>>> format with the following structure
>>>
>>>
>>> root
>>>
>>>  |-- newtopic_value: struct (nullable = true)
>>>
>>>  ||-- uuid: string (nullable = true)
>>>
>>>  ||-- timeissued: timestamp (nullable = true)
>>>
>>>  ||-- queue: string (nullable = true)
>>>
>>>  ||-- status: string (nullable = true)
>>>
>>> In above the queue refers to the business topic) and status is set to
>>> 'true', meaning carry on processing the business stream. This control topic
>>> streaming  can be restarted anytime, and status can be set to false if we
>>> want to stop the streaming queue for a given business topic
>>>
>>> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
>>> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
>>> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}
>>>
>>> 64a8321c-1593-428b-ae65-89e45ddf0640
>>> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
>>> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}
>>>
>>> So how can I stop the business queue when the current business topic
>>> message has been processed? Let us say the source is sending data for a
>>> business topic every 30 seconds. Our control topic sends a one liner as
>>> above every 2 seconds.
>>>
>>> In your writestream add the following line to be able to identify topic
>>> name
>>>
>>> trigger(processingTime='30 seconds'). \
>>> *queryName('md'). *\
>>>
>>> Next the controlling topic (called newtopic)  has the following
>>>
>>> foreachBatch(*sendToControl*). \
>>> trigger(processingTime='2 seconds'). \
>>> queryName('newtopic'). \
>>>
>>> That method sendToControl does what is needed
>>>
>>> def sendToControl(dfnewtopic, batchId):
>>> if(len(dfnewtopic.take(1))) > 0:
>>> #print(f"""newtopic batchId is {batchId}""")
>>> #dfnewtopic.show(10,False)
>>> queue = dfnewtopic.select(col("queue")).collect()[0][0]
>>> status = dfnewtopic.select(col("status")).collect()[0][0]
>>>
>>> if((queue == 'md')) & (status == 'false')):
>>>   spark_session = s.spark_session(config['common']['appName'])
>>>   active = spark_session.streams.active
>>>   for e in active:
>>>  #print(e)
>>>  name = e.name
>>>  if(name == 'md'):
>>> print(f"""Terminating streaming process {name}""")
>>> e.stop()
>>> else:
>>> print("DataFrame newtopic is empty")
>>>
>>> This seems to work as I checked it to ensure that in this case data was
>>> written and saved to the target sink (BigQuery table). It will wait until
>>> data is written completely meaning the current streaming message is
>>> processed and there is a latency there (meaning waiting for graceful
>>> completion)
>>>
>>> This is the output
>>>
>>> Terminating streaming process md
>>> wrote to DB  ## this is the flag  I added to ensure the current
>>> micro-bath was completed
>>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md
>>> [id = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
>>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>>>
>>> The various termination processes are described in
>>>
>>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
>>> (apache.org)
>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>>>
>>> This is the idea I came up with which allows ending the streaming
>>> process with least cost.
>>>
>>> HTH
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
>>> gourav.sengupta.develo...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> just thought of reaching out once again and seeking out your kind help
>>>> to find out what is the best way to stop SPARK streaming gracefully. Do we
>>>> still use the methods of creating a file as in SPARK 2.4.x which is several
>>>> years old method or do we have a better approach in SPARK 3.1?
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>> -- Forwarded message -
>>>> From: Gourav Sengupta 
>>>> Date: Wed, Apr 21, 2021 at 10:06 AM
>>>> Subject: Graceful shutdown SPARK Structured Streaming
>>>> To: 
>>>>
>>>>
>>>> Dear friends,
>>>>
>>>> is there any documentation available for gracefully stopping SPARK
>>>> Structured Streaming in 3.1.x?
>>>>
>>>> I am referring to articles which are 4 to 5 years old and was wondering
>>>> whether there is a better way available today to gracefully shutdown a
>>>> SPARK streaming job.
>>>>
>>>> Thanks a ton in advance for all your kind help.
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>> --
> Best Regards,
> Ayan Guha
>


Re: Graceful shutdown SPARK Structured Streaming

2021-05-06 Thread ayan guha
testream add the following line to be able to identify topic
>> name
>>
>> trigger(processingTime='30 seconds'). \
>> *queryName('md'). *\
>>
>> Next the controlling topic (called newtopic)  has the following
>>
>> foreachBatch(*sendToControl*). \
>> trigger(processingTime='2 seconds'). \
>> queryName('newtopic'). \
>>
>> That method sendToControl does what is needed
>>
>> def sendToControl(dfnewtopic, batchId):
>> if(len(dfnewtopic.take(1))) > 0:
>> #print(f"""newtopic batchId is {batchId}""")
>> #dfnewtopic.show(10,False)
>> queue = dfnewtopic.select(col("queue")).collect()[0][0]
>> status = dfnewtopic.select(col("status")).collect()[0][0]
>>
>> if((queue == 'md')) & (status == 'false')):
>>   spark_session = s.spark_session(config['common']['appName'])
>>   active = spark_session.streams.active
>>   for e in active:
>>  #print(e)
>>  name = e.name
>>  if(name == 'md'):
>> print(f"""Terminating streaming process {name}""")
>> e.stop()
>> else:
>> print("DataFrame newtopic is empty")
>>
>> This seems to work as I checked it to ensure that in this case data was
>> written and saved to the target sink (BigQuery table). It will wait until
>> data is written completely meaning the current streaming message is
>> processed and there is a latency there (meaning waiting for graceful
>> completion)
>>
>> This is the output
>>
>> Terminating streaming process md
>> wrote to DB  ## this is the flag  I added to ensure the current
>> micro-bath was completed
>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id
>> = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>>
>> The various termination processes are described in
>>
>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
>> (apache.org)
>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>>
>> This is the idea I came up with which allows ending the streaming process
>> with least cost.
>>
>> HTH
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
>> gourav.sengupta.develo...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> just thought of reaching out once again and seeking out your kind help
>>> to find out what is the best way to stop SPARK streaming gracefully. Do we
>>> still use the methods of creating a file as in SPARK 2.4.x which is several
>>> years old method or do we have a better approach in SPARK 3.1?
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> -- Forwarded message -
>>> From: Gourav Sengupta 
>>> Date: Wed, Apr 21, 2021 at 10:06 AM
>>> Subject: Graceful shutdown SPARK Structured Streaming
>>> To: 
>>>
>>>
>>> Dear friends,
>>>
>>> is there any documentation available for gracefully stopping SPARK
>>> Structured Streaming in 3.1.x?
>>>
>>> I am referring to articles which are 4 to 5 years old and was wondering
>>> whether there is a better way available today to gracefully shutdown a
>>> SPARK streaming job.
>>>
>>> Thanks a ton in advance for all your kind help.
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>> --
Best Regards,
Ayan Guha


Re: Graceful shutdown SPARK Structured Streaming

2021-05-06 Thread Gourav Sengupta
> status = dfnewtopic.select(col("status")).collect()[0][0]
>
> if((queue == 'md')) & (status == 'false')):
>   spark_session = s.spark_session(config['common']['appName'])
>   active = spark_session.streams.active
>   for e in active:
>  #print(e)
>  name = e.name
>  if(name == 'md'):
> print(f"""Terminating streaming process {name}""")
> e.stop()
> else:
> print("DataFrame newtopic is empty")
>
> This seems to work as I checked it to ensure that in this case data was
> written and saved to the target sink (BigQuery table). It will wait until
> data is written completely meaning the current streaming message is
> processed and there is a latency there (meaning waiting for graceful
> completion)
>
> This is the output
>
> Terminating streaming process md
> wrote to DB  ## this is the flag  I added to ensure the current
> micro-bath was completed
> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id
> = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>
> The various termination processes are described in
>
> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
> (apache.org)
> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>
> This is the idea I came up with which allows ending the streaming process
> with least cost.
>
> HTH
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
> gourav.sengupta.develo...@gmail.com> wrote:
>
>> Hi,
>>
>> just thought of reaching out once again and seeking out your kind help to
>> find out what is the best way to stop SPARK streaming gracefully. Do we
>> still use the methods of creating a file as in SPARK 2.4.x which is several
>> years old method or do we have a better approach in SPARK 3.1?
>>
>> Regards,
>> Gourav Sengupta
>>
>> -- Forwarded message -
>> From: Gourav Sengupta 
>> Date: Wed, Apr 21, 2021 at 10:06 AM
>> Subject: Graceful shutdown SPARK Structured Streaming
>> To: 
>>
>>
>> Dear friends,
>>
>> is there any documentation available for gracefully stopping SPARK
>> Structured Streaming in 3.1.x?
>>
>> I am referring to articles which are 4 to 5 years old and was wondering
>> whether there is a better way available today to gracefully shutdown a
>> SPARK streaming job.
>>
>> Thanks a ton in advance for all your kind help.
>>
>> Regards,
>> Gourav Sengupta
>>
>


Re: Graceful shutdown SPARK Structured Streaming

2021-05-05 Thread Mich Talebzadeh
 is
processed and there is a latency there (meaning waiting for graceful
completion)

This is the output

Terminating streaming process md
wrote to DB  ## this is the flag  I added to ensure the current micro-bath
was completed
2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id =
6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error

The various termination processes are described in

Structured Streaming Programming Guide - Spark 3.1.1 Documentation
(apache.org)
<http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>

This is the idea I came up with which allows ending the streaming process
with least cost.

HTH

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
gourav.sengupta.develo...@gmail.com> wrote:

> Hi,
>
> just thought of reaching out once again and seeking out your kind help to
> find out what is the best way to stop SPARK streaming gracefully. Do we
> still use the methods of creating a file as in SPARK 2.4.x which is several
> years old method or do we have a better approach in SPARK 3.1?
>
> Regards,
> Gourav Sengupta
>
> -- Forwarded message -
> From: Gourav Sengupta 
> Date: Wed, Apr 21, 2021 at 10:06 AM
> Subject: Graceful shutdown SPARK Structured Streaming
> To: 
>
>
> Dear friends,
>
> is there any documentation available for gracefully stopping SPARK
> Structured Streaming in 3.1.x?
>
> I am referring to articles which are 4 to 5 years old and was wondering
> whether there is a better way available today to gracefully shutdown a
> SPARK streaming job.
>
> Thanks a ton in advance for all your kind help.
>
> Regards,
> Gourav Sengupta
>


Fwd: Graceful shutdown SPARK Structured Streaming

2021-05-05 Thread Gourav Sengupta
Hi,

just thought of reaching out once again and seeking out your kind help to
find out what is the best way to stop SPARK streaming gracefully. Do we
still use the methods of creating a file as in SPARK 2.4.x which is several
years old method or do we have a better approach in SPARK 3.1?

Regards,
Gourav Sengupta

-- Forwarded message -
From: Gourav Sengupta 
Date: Wed, Apr 21, 2021 at 10:06 AM
Subject: Graceful shutdown SPARK Structured Streaming
To: 


Dear friends,

is there any documentation available for gracefully stopping SPARK
Structured Streaming in 3.1.x?

I am referring to articles which are 4 to 5 years old and was wondering
whether there is a better way available today to gracefully shutdown a
SPARK streaming job.

Thanks a ton in advance for all your kind help.

Regards,
Gourav Sengupta


Graceful shutdown SPARK Structured Streaming

2021-04-21 Thread Gourav Sengupta
Dear friends,

is there any documentation available for gracefully stopping SPARK
Structured Streaming in 3.1.x?

I am referring to articles which are 4 to 5 years old and was wondering
whether there is a better way available today to gracefully shutdown a
SPARK streaming job.

Thanks a ton in advance for all your kind help.

Regards,
Gourav Sengupta