What is it that stop(stopGraceFully=True) does then?
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/context.html#StreamingContext.stop

ons. 8. feb. 2023 kl. 19:22 skrev Brian Wylie <briford.wy...@gmail.com>:

> It's been a few years (so this approach might be out of date) but here's
> what I used for PySpark as part of this SO (
> https://stackoverflow.com/questions/45717433/stop-structured-streaming-query-gracefully/65708677
> )
>
> ```
>
> # Helper method to stop a streaming query
> def stop_stream_query(query, wait_time):
>     """Stop a running streaming query"""
>     while query.isActive:
>         msg = query.status['message']
>         data_avail = query.status['isDataAvailable']
>         trigger_active = query.status['isTriggerActive']
>         if not data_avail and not trigger_active and msg != "Initializing 
> sources":
>             print('Stopping query...')
>             query.stop()
>         time.sleep(0.5)
>
>     # Okay wait for the stop to happen
>     print('Awaiting termination...')
>     query.awaitTermination(wait_time)
> ```
>
>
> I'd also be interested is there is a newer/better way to do this.. so please 
> cc me on updates :)
>
>
> On Thu, May 6, 2021 at 1:08 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> That is a valid question and I am not aware of any new addition to Spark
>> Structured Streaming (SSS) in newer releases for this graceful shutdown.
>>
>> Going back to my earlier explanation, there are occasions that you may
>> want to stop the Spark program gracefully. Gracefully meaning that Spark
>> application handles the last streaming message completely and terminates
>> the application. This is different from invoking interrupts such as CTRL-C.
>> Of course one can terminate the process based on the following
>>
>>
>>    1.
>>
>>    query.awaitTermination() # Waits for the termination of this query,
>>    with stop() or with error
>>    2.
>>
>>    query.awaitTermination(timeoutMs) # Returns true if this query is
>>    terminated within the timeout in milliseconds.
>>
>> So the first one above waits until an interrupt signal is received. The
>> second one will count the timeout and will exit when timeout in
>> milliseconds is reached
>>
>> The issue is that one needs to predict how long the streaming job needs
>> to run. Clearly any interrupt at the terminal or OS level (kill process),
>> may end up the processing terminated without a proper completion of the
>> streaming process.
>> So I gather if we agree on what constitutes a graceful shutdown we can
>> consider both the tool offerings from Spark itself  or what solutions we
>> can come up with.
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 6 May 2021 at 13:28, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> What are some other "newer" methodologies?
>>>
>>> Really interested to understand what is possible here as this is a topic
>>> came up in this forum time and again.
>>>
>>> On Thu, 6 May 2021 at 5:13 pm, Gourav Sengupta <
>>> gourav.sengupta.develo...@gmail.com> wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> thanks a ton for your kind response, looks like we are still using the
>>>> earlier methodologies for stopping a spark streaming program gracefully.
>>>>
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>> On Wed, May 5, 2021 at 6:04 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> I believe I discussed this in this forum. I sent the following to
>>>>> spark-dev forum as an add-on to Spark functionality. This is the gist of
>>>>> it.
>>>>>
>>>>>
>>>>> Spark Structured Streaming AKA SSS is a very useful tool in dealing
>>>>> with Event Driven Architecture. In an Event Driven Architecture, there is
>>>>> generally a main loop that listens for events and then triggers a 
>>>>> call-back
>>>>> function when one of those events is detected. In a streaming application
>>>>> the application waits to receive the source messages in a set interval or
>>>>> whenever they happen and reacts accordingly.
>>>>>
>>>>> There are occasions that you may want to stop the Spark program
>>>>> gracefully. Gracefully meaning that Spark application handles the
>>>>> last streaming message completely and terminates the application. This is
>>>>> different from invoking interrupts such as CTRL-C. Of course one can
>>>>> terminate the process based on the following
>>>>>
>>>>>
>>>>>    1.
>>>>>
>>>>>    query.awaitTermination() # Waits for the termination of this
>>>>>    query, with stop() or with error
>>>>>    2.
>>>>>
>>>>>    query.awaitTermination(timeoutMs) # Returns true if this query is
>>>>>    terminated within the timeout in milliseconds.
>>>>>
>>>>> So the first one above waits until an interrupt signal is received.
>>>>> The second one will count the timeout and will exit when timeout in
>>>>> milliseconds is reached
>>>>>
>>>>> The issue is that one needs to predict how long the streaming job
>>>>> needs to run. Clearly any interrupt at the terminal or OS level (kill
>>>>> process), may end up the processing terminated without a proper completion
>>>>> of the streaming process.
>>>>>
>>>>> I have devised a method that allows one to terminate the spark
>>>>> application internally after processing the last received message. Within
>>>>> say 2 seconds of the confirmation of shutdown, the process will invoke
>>>>>
>>>>> How to shutdown the topic doing work for the message being processed,
>>>>> wait for it to complete and shutdown the streaming process for a given
>>>>> topic.
>>>>>
>>>>>
>>>>> I thought about this and looked at options. Using sensors to
>>>>> implement this like airflow would be expensive as for example reading a
>>>>> file from object storage or from an underlying database would have 
>>>>> incurred
>>>>> additional I/O overheads through continuous polling.
>>>>>
>>>>>
>>>>> So the design had to be incorporated into the streaming process
>>>>> itself. What I came up with was an addition of a control topic (I call it
>>>>> newtopic below), which keeps running triggered every 2 seconds say and is
>>>>> in json format with the following structure
>>>>>
>>>>>
>>>>> root
>>>>>
>>>>>  |-- newtopic_value: struct (nullable = true)
>>>>>
>>>>>  |    |-- uuid: string (nullable = true)
>>>>>
>>>>>  |    |-- timeissued: timestamp (nullable = true)
>>>>>
>>>>>  |    |-- queue: string (nullable = true)
>>>>>
>>>>>  |    |-- status: string (nullable = true)
>>>>>
>>>>> In above the queue refers to the business topic) and status is set to
>>>>> 'true', meaning carry on processing the business stream. This control 
>>>>> topic
>>>>> streaming  can be restarted anytime, and status can be set to false if we
>>>>> want to stop the streaming queue for a given business topic
>>>>>
>>>>> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
>>>>> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
>>>>> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}
>>>>>
>>>>> 64a8321c-1593-428b-ae65-89e45ddf0640
>>>>> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
>>>>> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}
>>>>>
>>>>> So how can I stop the business queue when the current business topic
>>>>> message has been processed? Let us say the source is sending data for a
>>>>> business topic every 30 seconds. Our control topic sends a one liner as
>>>>> above every 2 seconds.
>>>>>
>>>>> In your writestream add the following line to be able to identify
>>>>> topic name
>>>>>
>>>>> trigger(processingTime='30 seconds'). \
>>>>> *queryName('md'). *\
>>>>>
>>>>> Next the controlling topic (called newtopic)  has the following
>>>>>
>>>>> foreachBatch(*sendToControl*). \
>>>>> trigger(processingTime='2 seconds'). \
>>>>> queryName('newtopic'). \
>>>>>
>>>>> That method sendToControl does what is needed
>>>>>
>>>>> def sendToControl(dfnewtopic, batchId):
>>>>>     if(len(dfnewtopic.take(1))) > 0:
>>>>>         #print(f"""newtopic batchId is {batchId}""")
>>>>>         #dfnewtopic.show(10,False)
>>>>>         queue = dfnewtopic.select(col("queue")).collect()[0][0]
>>>>>         status = dfnewtopic.select(col("status")).collect()[0][0]
>>>>>
>>>>>         if((queue == 'md')) & (status == 'false')):
>>>>>           spark_session = s.spark_session(config['common']['appName'])
>>>>>           active = spark_session.streams.active
>>>>>           for e in active:
>>>>>              #print(e)
>>>>>              name = e.name
>>>>>              if(name == 'md'):
>>>>>                 print(f"""Terminating streaming process {name}""")
>>>>>                 e.stop()
>>>>>     else:
>>>>>         print("DataFrame newtopic is empty")
>>>>>
>>>>> This seems to work as I checked it to ensure that in this case data
>>>>> was written and saved to the target sink (BigQuery table). It will wait
>>>>> until data is written completely meaning the current streaming message is
>>>>> processed and there is a latency there (meaning waiting for graceful
>>>>> completion)
>>>>>
>>>>> This is the output
>>>>>
>>>>> Terminating streaming process md
>>>>> wrote to DB  ## this is the flag  I added to ensure the current
>>>>> micro-bath was completed
>>>>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md
>>>>> [id = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
>>>>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>>>>>
>>>>> The various termination processes are described in
>>>>>
>>>>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
>>>>> (apache.org)
>>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>>>>>
>>>>> This is the idea I came up with which allows ending the streaming
>>>>> process with least cost.
>>>>>
>>>>> HTH
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
>>>>> gourav.sengupta.develo...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> just thought of reaching out once again and seeking out your kind
>>>>>> help to find out what is the best way to stop SPARK streaming gracefully.
>>>>>> Do we still use the methods of creating a file as in SPARK 2.4.x which is
>>>>>> several years old method or do we have a better approach in SPARK 3.1?
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>> ---------- Forwarded message ---------
>>>>>> From: Gourav Sengupta <gourav.sengupta.develo...@gmail.com>
>>>>>> Date: Wed, Apr 21, 2021 at 10:06 AM
>>>>>> Subject: Graceful shutdown SPARK Structured Streaming
>>>>>> To: <user@spark.apache.org>
>>>>>>
>>>>>>
>>>>>> Dear friends,
>>>>>>
>>>>>> is there any documentation available for gracefully stopping SPARK
>>>>>> Structured Streaming in 3.1.x?
>>>>>>
>>>>>> I am referring to articles which are 4 to 5 years old and was
>>>>>> wondering whether there is a better way available today to gracefully
>>>>>> shutdown a SPARK streaming job.
>>>>>>
>>>>>> Thanks a ton in advance for all your kind help.
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to