Hi Mich, thanks a ton for your kind response, looks like we are still using the earlier methodologies for stopping a spark streaming program gracefully.
Regards, Gourav Sengupta On Wed, May 5, 2021 at 6:04 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > > Hi, > > > I believe I discussed this in this forum. I sent the following to > spark-dev forum as an add-on to Spark functionality. This is the gist of > it. > > > Spark Structured Streaming AKA SSS is a very useful tool in dealing with > Event Driven Architecture. In an Event Driven Architecture, there is > generally a main loop that listens for events and then triggers a call-back > function when one of those events is detected. In a streaming application > the application waits to receive the source messages in a set interval or > whenever they happen and reacts accordingly. > > There are occasions that you may want to stop the Spark program gracefully > . Gracefully meaning that Spark application handles the last streaming > message completely and terminates the application. This is different from > invoking interrupts such as CTRL-C. Of course one can terminate the process > based on the following > > > 1. > > query.awaitTermination() # Waits for the termination of this query, > with stop() or with error > 2. > > query.awaitTermination(timeoutMs) # Returns true if this query is > terminated within the timeout in milliseconds. > > So the first one above waits until an interrupt signal is received. The > second one will count the timeout and will exit when timeout in > milliseconds is reached > > The issue is that one needs to predict how long the streaming job needs to > run. Clearly any interrupt at the terminal or OS level (kill process), may > end up the processing terminated without a proper completion of the > streaming process. > > I have devised a method that allows one to terminate the spark application > internally after processing the last received message. Within say 2 seconds > of the confirmation of shutdown, the process will invoke > > How to shutdown the topic doing work for the message being processed, wait > for it to complete and shutdown the streaming process for a given topic. > > > I thought about this and looked at options. Using sensors to > implement this like airflow would be expensive as for example reading a > file from object storage or from an underlying database would have incurred > additional I/O overheads through continuous polling. > > > So the design had to be incorporated into the streaming process itself. > What I came up with was an addition of a control topic (I call it newtopic > below), which keeps running triggered every 2 seconds say and is in json > format with the following structure > > > root > > |-- newtopic_value: struct (nullable = true) > > | |-- uuid: string (nullable = true) > > | |-- timeissued: timestamp (nullable = true) > > | |-- queue: string (nullable = true) > > | |-- status: string (nullable = true) > > In above the queue refers to the business topic) and status is set to > 'true', meaning carry on processing the business stream. This control topic > streaming can be restarted anytime, and status can be set to false if we > want to stop the streaming queue for a given business topic > > ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe > {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe", > "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"} > > 64a8321c-1593-428b-ae65-89e45ddf0640 > {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640", > "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"} > > So how can I stop the business queue when the current business topic > message has been processed? Let us say the source is sending data for a > business topic every 30 seconds. Our control topic sends a one liner as > above every 2 seconds. > > In your writestream add the following line to be able to identify topic > name > > trigger(processingTime='30 seconds'). \ > *queryName('md'). *\ > > Next the controlling topic (called newtopic) has the following > > foreachBatch(*sendToControl*). \ > trigger(processingTime='2 seconds'). \ > queryName('newtopic'). \ > > That method sendToControl does what is needed > > def sendToControl(dfnewtopic, batchId): > if(len(dfnewtopic.take(1))) > 0: > #print(f"""newtopic batchId is {batchId}""") > #dfnewtopic.show(10,False) > queue = dfnewtopic.select(col("queue")).collect()[0][0] > status = dfnewtopic.select(col("status")).collect()[0][0] > > if((queue == 'md')) & (status == 'false')): > spark_session = s.spark_session(config['common']['appName']) > active = spark_session.streams.active > for e in active: > #print(e) > name = e.name > if(name == 'md'): > print(f"""Terminating streaming process {name}""") > e.stop() > else: > print("DataFrame newtopic is empty") > > This seems to work as I checked it to ensure that in this case data was > written and saved to the target sink (BigQuery table). It will wait until > data is written completely meaning the current streaming message is > processed and there is a latency there (meaning waiting for graceful > completion) > > This is the output > > Terminating streaming process md > wrote to DB ## this is the flag I added to ensure the current > micro-bath was completed > 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id > = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId = > 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error > > The various termination processes are described in > > Structured Streaming Programming Guide - Spark 3.1.1 Documentation > (apache.org) > <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries> > > This is the idea I came up with which allows ending the streaming process > with least cost. > > HTH > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 5 May 2021 at 17:30, Gourav Sengupta < > gourav.sengupta.develo...@gmail.com> wrote: > >> Hi, >> >> just thought of reaching out once again and seeking out your kind help to >> find out what is the best way to stop SPARK streaming gracefully. Do we >> still use the methods of creating a file as in SPARK 2.4.x which is several >> years old method or do we have a better approach in SPARK 3.1? >> >> Regards, >> Gourav Sengupta >> >> ---------- Forwarded message --------- >> From: Gourav Sengupta <gourav.sengupta.develo...@gmail.com> >> Date: Wed, Apr 21, 2021 at 10:06 AM >> Subject: Graceful shutdown SPARK Structured Streaming >> To: <user@spark.apache.org> >> >> >> Dear friends, >> >> is there any documentation available for gracefully stopping SPARK >> Structured Streaming in 3.1.x? >> >> I am referring to articles which are 4 to 5 years old and was wondering >> whether there is a better way available today to gracefully shutdown a >> SPARK streaming job. >> >> Thanks a ton in advance for all your kind help. >> >> Regards, >> Gourav Sengupta >> >