Hi Mich,

thanks a ton for your kind response, looks like we are still using the
earlier methodologies for stopping a spark streaming program gracefully.


Regards,
Gourav Sengupta

On Wed, May 5, 2021 at 6:04 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> Hi,
>
>
> I believe I discussed this in this forum. I sent the following to
> spark-dev forum as an add-on to Spark functionality. This is the gist of
> it.
>
>
> Spark Structured Streaming AKA SSS is a very useful tool in dealing with
> Event Driven Architecture. In an Event Driven Architecture, there is
> generally a main loop that listens for events and then triggers a call-back
> function when one of those events is detected. In a streaming application
> the application waits to receive the source messages in a set interval or
> whenever they happen and reacts accordingly.
>
> There are occasions that you may want to stop the Spark program gracefully
> . Gracefully meaning that Spark application handles the last streaming
> message completely and terminates the application. This is different from
> invoking interrupts such as CTRL-C. Of course one can terminate the process
> based on the following
>
>
>    1.
>
>    query.awaitTermination() # Waits for the termination of this query,
>    with stop() or with error
>    2.
>
>    query.awaitTermination(timeoutMs) # Returns true if this query is
>    terminated within the timeout in milliseconds.
>
> So the first one above waits until an interrupt signal is received. The
> second one will count the timeout and will exit when timeout in
> milliseconds is reached
>
> The issue is that one needs to predict how long the streaming job needs to
> run. Clearly any interrupt at the terminal or OS level (kill process), may
> end up the processing terminated without a proper completion of the
> streaming process.
>
> I have devised a method that allows one to terminate the spark application
> internally after processing the last received message. Within say 2 seconds
> of the confirmation of shutdown, the process will invoke
>
> How to shutdown the topic doing work for the message being processed, wait
> for it to complete and shutdown the streaming process for a given topic.
>
>
> I thought about this and looked at options. Using sensors to
> implement this like airflow would be expensive as for example reading a
> file from object storage or from an underlying database would have incurred
> additional I/O overheads through continuous polling.
>
>
> So the design had to be incorporated into the streaming process itself.
> What I came up with was an addition of a control topic (I call it newtopic
> below), which keeps running triggered every 2 seconds say and is in json
> format with the following structure
>
>
> root
>
>  |-- newtopic_value: struct (nullable = true)
>
>  |    |-- uuid: string (nullable = true)
>
>  |    |-- timeissued: timestamp (nullable = true)
>
>  |    |-- queue: string (nullable = true)
>
>  |    |-- status: string (nullable = true)
>
> In above the queue refers to the business topic) and status is set to
> 'true', meaning carry on processing the business stream. This control topic
> streaming  can be restarted anytime, and status can be set to false if we
> want to stop the streaming queue for a given business topic
>
> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}
>
> 64a8321c-1593-428b-ae65-89e45ddf0640
> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}
>
> So how can I stop the business queue when the current business topic
> message has been processed? Let us say the source is sending data for a
> business topic every 30 seconds. Our control topic sends a one liner as
> above every 2 seconds.
>
> In your writestream add the following line to be able to identify topic
> name
>
> trigger(processingTime='30 seconds'). \
> *queryName('md'). *\
>
> Next the controlling topic (called newtopic)  has the following
>
> foreachBatch(*sendToControl*). \
> trigger(processingTime='2 seconds'). \
> queryName('newtopic'). \
>
> That method sendToControl does what is needed
>
> def sendToControl(dfnewtopic, batchId):
>     if(len(dfnewtopic.take(1))) > 0:
>         #print(f"""newtopic batchId is {batchId}""")
>         #dfnewtopic.show(10,False)
>         queue = dfnewtopic.select(col("queue")).collect()[0][0]
>         status = dfnewtopic.select(col("status")).collect()[0][0]
>
>         if((queue == 'md')) & (status == 'false')):
>           spark_session = s.spark_session(config['common']['appName'])
>           active = spark_session.streams.active
>           for e in active:
>              #print(e)
>              name = e.name
>              if(name == 'md'):
>                 print(f"""Terminating streaming process {name}""")
>                 e.stop()
>     else:
>         print("DataFrame newtopic is empty")
>
> This seems to work as I checked it to ensure that in this case data was
> written and saved to the target sink (BigQuery table). It will wait until
> data is written completely meaning the current streaming message is
> processed and there is a latency there (meaning waiting for graceful
> completion)
>
> This is the output
>
> Terminating streaming process md
> wrote to DB  ## this is the flag  I added to ensure the current
> micro-bath was completed
> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id
> = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>
> The various termination processes are described in
>
> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
> (apache.org)
> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>
> This is the idea I came up with which allows ending the streaming process
> with least cost.
>
> HTH
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
> gourav.sengupta.develo...@gmail.com> wrote:
>
>> Hi,
>>
>> just thought of reaching out once again and seeking out your kind help to
>> find out what is the best way to stop SPARK streaming gracefully. Do we
>> still use the methods of creating a file as in SPARK 2.4.x which is several
>> years old method or do we have a better approach in SPARK 3.1?
>>
>> Regards,
>> Gourav Sengupta
>>
>> ---------- Forwarded message ---------
>> From: Gourav Sengupta <gourav.sengupta.develo...@gmail.com>
>> Date: Wed, Apr 21, 2021 at 10:06 AM
>> Subject: Graceful shutdown SPARK Structured Streaming
>> To: <user@spark.apache.org>
>>
>>
>> Dear friends,
>>
>> is there any documentation available for gracefully stopping SPARK
>> Structured Streaming in 3.1.x?
>>
>> I am referring to articles which are 4 to 5 years old and was wondering
>> whether there is a better way available today to gracefully shutdown a
>> SPARK streaming job.
>>
>> Thanks a ton in advance for all your kind help.
>>
>> Regards,
>> Gourav Sengupta
>>
>

Reply via email to