What is it that stop(stopGraceFully=True) does then? https://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/context.html#StreamingContext.stop
ons. 8. feb. 2023 kl. 19:22 skrev Brian Wylie <briford.wy...@gmail.com>: > It's been a few years (so this approach might be out of date) but here's > what I used for PySpark as part of this SO ( > https://stackoverflow.com/questions/45717433/stop-structured-streaming-query-gracefully/65708677 > ) > > ``` > > # Helper method to stop a streaming query > def stop_stream_query(query, wait_time): > """Stop a running streaming query""" > while query.isActive: > msg = query.status['message'] > data_avail = query.status['isDataAvailable'] > trigger_active = query.status['isTriggerActive'] > if not data_avail and not trigger_active and msg != "Initializing > sources": > print('Stopping query...') > query.stop() > time.sleep(0.5) > > # Okay wait for the stop to happen > print('Awaiting termination...') > query.awaitTermination(wait_time) > ``` > > > I'd also be interested is there is a newer/better way to do this.. so please > cc me on updates :) > > > On Thu, May 6, 2021 at 1:08 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> That is a valid question and I am not aware of any new addition to Spark >> Structured Streaming (SSS) in newer releases for this graceful shutdown. >> >> Going back to my earlier explanation, there are occasions that you may >> want to stop the Spark program gracefully. Gracefully meaning that Spark >> application handles the last streaming message completely and terminates >> the application. This is different from invoking interrupts such as CTRL-C. >> Of course one can terminate the process based on the following >> >> >> 1. >> >> query.awaitTermination() # Waits for the termination of this query, >> with stop() or with error >> 2. >> >> query.awaitTermination(timeoutMs) # Returns true if this query is >> terminated within the timeout in milliseconds. >> >> So the first one above waits until an interrupt signal is received. The >> second one will count the timeout and will exit when timeout in >> milliseconds is reached >> >> The issue is that one needs to predict how long the streaming job needs >> to run. Clearly any interrupt at the terminal or OS level (kill process), >> may end up the processing terminated without a proper completion of the >> streaming process. >> So I gather if we agree on what constitutes a graceful shutdown we can >> consider both the tool offerings from Spark itself or what solutions we >> can come up with. >> >> HTH >> >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Thu, 6 May 2021 at 13:28, ayan guha <guha.a...@gmail.com> wrote: >> >>> What are some other "newer" methodologies? >>> >>> Really interested to understand what is possible here as this is a topic >>> came up in this forum time and again. >>> >>> On Thu, 6 May 2021 at 5:13 pm, Gourav Sengupta < >>> gourav.sengupta.develo...@gmail.com> wrote: >>> >>>> Hi Mich, >>>> >>>> thanks a ton for your kind response, looks like we are still using the >>>> earlier methodologies for stopping a spark streaming program gracefully. >>>> >>>> >>>> Regards, >>>> Gourav Sengupta >>>> >>>> On Wed, May 5, 2021 at 6:04 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> >>>>> Hi, >>>>> >>>>> >>>>> I believe I discussed this in this forum. I sent the following to >>>>> spark-dev forum as an add-on to Spark functionality. This is the gist of >>>>> it. >>>>> >>>>> >>>>> Spark Structured Streaming AKA SSS is a very useful tool in dealing >>>>> with Event Driven Architecture. In an Event Driven Architecture, there is >>>>> generally a main loop that listens for events and then triggers a >>>>> call-back >>>>> function when one of those events is detected. In a streaming application >>>>> the application waits to receive the source messages in a set interval or >>>>> whenever they happen and reacts accordingly. >>>>> >>>>> There are occasions that you may want to stop the Spark program >>>>> gracefully. Gracefully meaning that Spark application handles the >>>>> last streaming message completely and terminates the application. This is >>>>> different from invoking interrupts such as CTRL-C. Of course one can >>>>> terminate the process based on the following >>>>> >>>>> >>>>> 1. >>>>> >>>>> query.awaitTermination() # Waits for the termination of this >>>>> query, with stop() or with error >>>>> 2. >>>>> >>>>> query.awaitTermination(timeoutMs) # Returns true if this query is >>>>> terminated within the timeout in milliseconds. >>>>> >>>>> So the first one above waits until an interrupt signal is received. >>>>> The second one will count the timeout and will exit when timeout in >>>>> milliseconds is reached >>>>> >>>>> The issue is that one needs to predict how long the streaming job >>>>> needs to run. Clearly any interrupt at the terminal or OS level (kill >>>>> process), may end up the processing terminated without a proper completion >>>>> of the streaming process. >>>>> >>>>> I have devised a method that allows one to terminate the spark >>>>> application internally after processing the last received message. Within >>>>> say 2 seconds of the confirmation of shutdown, the process will invoke >>>>> >>>>> How to shutdown the topic doing work for the message being processed, >>>>> wait for it to complete and shutdown the streaming process for a given >>>>> topic. >>>>> >>>>> >>>>> I thought about this and looked at options. Using sensors to >>>>> implement this like airflow would be expensive as for example reading a >>>>> file from object storage or from an underlying database would have >>>>> incurred >>>>> additional I/O overheads through continuous polling. >>>>> >>>>> >>>>> So the design had to be incorporated into the streaming process >>>>> itself. What I came up with was an addition of a control topic (I call it >>>>> newtopic below), which keeps running triggered every 2 seconds say and is >>>>> in json format with the following structure >>>>> >>>>> >>>>> root >>>>> >>>>> |-- newtopic_value: struct (nullable = true) >>>>> >>>>> | |-- uuid: string (nullable = true) >>>>> >>>>> | |-- timeissued: timestamp (nullable = true) >>>>> >>>>> | |-- queue: string (nullable = true) >>>>> >>>>> | |-- status: string (nullable = true) >>>>> >>>>> In above the queue refers to the business topic) and status is set to >>>>> 'true', meaning carry on processing the business stream. This control >>>>> topic >>>>> streaming can be restarted anytime, and status can be set to false if we >>>>> want to stop the streaming queue for a given business topic >>>>> >>>>> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe >>>>> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe", >>>>> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"} >>>>> >>>>> 64a8321c-1593-428b-ae65-89e45ddf0640 >>>>> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640", >>>>> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"} >>>>> >>>>> So how can I stop the business queue when the current business topic >>>>> message has been processed? Let us say the source is sending data for a >>>>> business topic every 30 seconds. Our control topic sends a one liner as >>>>> above every 2 seconds. >>>>> >>>>> In your writestream add the following line to be able to identify >>>>> topic name >>>>> >>>>> trigger(processingTime='30 seconds'). \ >>>>> *queryName('md'). *\ >>>>> >>>>> Next the controlling topic (called newtopic) has the following >>>>> >>>>> foreachBatch(*sendToControl*). \ >>>>> trigger(processingTime='2 seconds'). \ >>>>> queryName('newtopic'). \ >>>>> >>>>> That method sendToControl does what is needed >>>>> >>>>> def sendToControl(dfnewtopic, batchId): >>>>> if(len(dfnewtopic.take(1))) > 0: >>>>> #print(f"""newtopic batchId is {batchId}""") >>>>> #dfnewtopic.show(10,False) >>>>> queue = dfnewtopic.select(col("queue")).collect()[0][0] >>>>> status = dfnewtopic.select(col("status")).collect()[0][0] >>>>> >>>>> if((queue == 'md')) & (status == 'false')): >>>>> spark_session = s.spark_session(config['common']['appName']) >>>>> active = spark_session.streams.active >>>>> for e in active: >>>>> #print(e) >>>>> name = e.name >>>>> if(name == 'md'): >>>>> print(f"""Terminating streaming process {name}""") >>>>> e.stop() >>>>> else: >>>>> print("DataFrame newtopic is empty") >>>>> >>>>> This seems to work as I checked it to ensure that in this case data >>>>> was written and saved to the target sink (BigQuery table). It will wait >>>>> until data is written completely meaning the current streaming message is >>>>> processed and there is a latency there (meaning waiting for graceful >>>>> completion) >>>>> >>>>> This is the output >>>>> >>>>> Terminating streaming process md >>>>> wrote to DB ## this is the flag I added to ensure the current >>>>> micro-bath was completed >>>>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md >>>>> [id = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId = >>>>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error >>>>> >>>>> The various termination processes are described in >>>>> >>>>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation >>>>> (apache.org) >>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries> >>>>> >>>>> This is the idea I came up with which allows ending the streaming >>>>> process with least cost. >>>>> >>>>> HTH >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta < >>>>> gourav.sengupta.develo...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> just thought of reaching out once again and seeking out your kind >>>>>> help to find out what is the best way to stop SPARK streaming gracefully. >>>>>> Do we still use the methods of creating a file as in SPARK 2.4.x which is >>>>>> several years old method or do we have a better approach in SPARK 3.1? >>>>>> >>>>>> Regards, >>>>>> Gourav Sengupta >>>>>> >>>>>> ---------- Forwarded message --------- >>>>>> From: Gourav Sengupta <gourav.sengupta.develo...@gmail.com> >>>>>> Date: Wed, Apr 21, 2021 at 10:06 AM >>>>>> Subject: Graceful shutdown SPARK Structured Streaming >>>>>> To: <user@spark.apache.org> >>>>>> >>>>>> >>>>>> Dear friends, >>>>>> >>>>>> is there any documentation available for gracefully stopping SPARK >>>>>> Structured Streaming in 3.1.x? >>>>>> >>>>>> I am referring to articles which are 4 to 5 years old and was >>>>>> wondering whether there is a better way available today to gracefully >>>>>> shutdown a SPARK streaming job. >>>>>> >>>>>> Thanks a ton in advance for all your kind help. >>>>>> >>>>>> Regards, >>>>>> Gourav Sengupta >>>>>> >>>>> -- >>> Best Regards, >>> Ayan Guha >>> >> -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297