What are some other "newer" methodologies? Really interested to understand what is possible here as this is a topic came up in this forum time and again.
On Thu, 6 May 2021 at 5:13 pm, Gourav Sengupta < gourav.sengupta.develo...@gmail.com> wrote: > Hi Mich, > > thanks a ton for your kind response, looks like we are still using the > earlier methodologies for stopping a spark streaming program gracefully. > > > Regards, > Gourav Sengupta > > On Wed, May 5, 2021 at 6:04 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> >> Hi, >> >> >> I believe I discussed this in this forum. I sent the following to >> spark-dev forum as an add-on to Spark functionality. This is the gist of >> it. >> >> >> Spark Structured Streaming AKA SSS is a very useful tool in dealing with >> Event Driven Architecture. In an Event Driven Architecture, there is >> generally a main loop that listens for events and then triggers a call-back >> function when one of those events is detected. In a streaming application >> the application waits to receive the source messages in a set interval or >> whenever they happen and reacts accordingly. >> >> There are occasions that you may want to stop the Spark program >> gracefully. Gracefully meaning that Spark application handles the last >> streaming message completely and terminates the application. This is >> different from invoking interrupts such as CTRL-C. Of course one can >> terminate the process based on the following >> >> >> 1. >> >> query.awaitTermination() # Waits for the termination of this query, >> with stop() or with error >> 2. >> >> query.awaitTermination(timeoutMs) # Returns true if this query is >> terminated within the timeout in milliseconds. >> >> So the first one above waits until an interrupt signal is received. The >> second one will count the timeout and will exit when timeout in >> milliseconds is reached >> >> The issue is that one needs to predict how long the streaming job needs >> to run. Clearly any interrupt at the terminal or OS level (kill process), >> may end up the processing terminated without a proper completion of the >> streaming process. >> >> I have devised a method that allows one to terminate the spark >> application internally after processing the last received message. Within >> say 2 seconds of the confirmation of shutdown, the process will invoke >> >> How to shutdown the topic doing work for the message being processed, >> wait for it to complete and shutdown the streaming process for a given >> topic. >> >> >> I thought about this and looked at options. Using sensors to >> implement this like airflow would be expensive as for example reading a >> file from object storage or from an underlying database would have incurred >> additional I/O overheads through continuous polling. >> >> >> So the design had to be incorporated into the streaming process itself. >> What I came up with was an addition of a control topic (I call it newtopic >> below), which keeps running triggered every 2 seconds say and is in json >> format with the following structure >> >> >> root >> >> |-- newtopic_value: struct (nullable = true) >> >> | |-- uuid: string (nullable = true) >> >> | |-- timeissued: timestamp (nullable = true) >> >> | |-- queue: string (nullable = true) >> >> | |-- status: string (nullable = true) >> >> In above the queue refers to the business topic) and status is set to >> 'true', meaning carry on processing the business stream. This control topic >> streaming can be restarted anytime, and status can be set to false if we >> want to stop the streaming queue for a given business topic >> >> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe >> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe", >> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"} >> >> 64a8321c-1593-428b-ae65-89e45ddf0640 >> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640", >> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"} >> >> So how can I stop the business queue when the current business topic >> message has been processed? Let us say the source is sending data for a >> business topic every 30 seconds. Our control topic sends a one liner as >> above every 2 seconds. >> >> In your writestream add the following line to be able to identify topic >> name >> >> trigger(processingTime='30 seconds'). \ >> *queryName('md'). *\ >> >> Next the controlling topic (called newtopic) has the following >> >> foreachBatch(*sendToControl*). \ >> trigger(processingTime='2 seconds'). \ >> queryName('newtopic'). \ >> >> That method sendToControl does what is needed >> >> def sendToControl(dfnewtopic, batchId): >> if(len(dfnewtopic.take(1))) > 0: >> #print(f"""newtopic batchId is {batchId}""") >> #dfnewtopic.show(10,False) >> queue = dfnewtopic.select(col("queue")).collect()[0][0] >> status = dfnewtopic.select(col("status")).collect()[0][0] >> >> if((queue == 'md')) & (status == 'false')): >> spark_session = s.spark_session(config['common']['appName']) >> active = spark_session.streams.active >> for e in active: >> #print(e) >> name = e.name >> if(name == 'md'): >> print(f"""Terminating streaming process {name}""") >> e.stop() >> else: >> print("DataFrame newtopic is empty") >> >> This seems to work as I checked it to ensure that in this case data was >> written and saved to the target sink (BigQuery table). It will wait until >> data is written completely meaning the current streaming message is >> processed and there is a latency there (meaning waiting for graceful >> completion) >> >> This is the output >> >> Terminating streaming process md >> wrote to DB ## this is the flag I added to ensure the current >> micro-bath was completed >> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id >> = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId = >> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error >> >> The various termination processes are described in >> >> Structured Streaming Programming Guide - Spark 3.1.1 Documentation >> (apache.org) >> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries> >> >> This is the idea I came up with which allows ending the streaming process >> with least cost. >> >> HTH >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Wed, 5 May 2021 at 17:30, Gourav Sengupta < >> gourav.sengupta.develo...@gmail.com> wrote: >> >>> Hi, >>> >>> just thought of reaching out once again and seeking out your kind help >>> to find out what is the best way to stop SPARK streaming gracefully. Do we >>> still use the methods of creating a file as in SPARK 2.4.x which is several >>> years old method or do we have a better approach in SPARK 3.1? >>> >>> Regards, >>> Gourav Sengupta >>> >>> ---------- Forwarded message --------- >>> From: Gourav Sengupta <gourav.sengupta.develo...@gmail.com> >>> Date: Wed, Apr 21, 2021 at 10:06 AM >>> Subject: Graceful shutdown SPARK Structured Streaming >>> To: <user@spark.apache.org> >>> >>> >>> Dear friends, >>> >>> is there any documentation available for gracefully stopping SPARK >>> Structured Streaming in 3.1.x? >>> >>> I am referring to articles which are 4 to 5 years old and was wondering >>> whether there is a better way available today to gracefully shutdown a >>> SPARK streaming job. >>> >>> Thanks a ton in advance for all your kind help. >>> >>> Regards, >>> Gourav Sengupta >>> >> -- Best Regards, Ayan Guha