Thanks for this hint.

OK this is what I have done. In method sendToControl I get status value. It
should be something like below

+------------------------------------+-------------------+------+
|uuid                                |timeissued         |status|
+------------------------------------+-------------------+------+
|1aa609d7-6eac-40f0-8c48-846afd10e66f|2021-04-22 19:09:22|false |
+------------------------------------+-------------------+------+

Note that status column has been set to false, meaning terminate the
streaming

def sendToControl(dfnewtopic, batchId):
    if(len(dfnewtopic.take(1))) > 0:
        print(f"""newtopic batchId is {batchId}""")
        dfnewtopic.show(100,False)
        status = dfnewtopic.select(col("status")).collect()[0][0]
        if status == 'false':
          spark_session = s.spark_session(config['common']['appName'])
          print("""Terminating streaming processes""")
          *active = spark_session.streams.active  ## this is a list of
active streams*
*          for e in active:*
*             print(e)*
*             e.stop()  ## terminate it*
    else:
        print("DataFrame newtopic is empty")

The result is

Terminating streaming processes
<pyspark.sql.streaming.StreamingQuery object at 0x7f7ff71572b0>
<pyspark.sql.streaming.StreamingQuery object at 0x7f7ff7157278>

Then it terminates the topic newcolumn (the controlling topic is easy to
terminate because it does not do much) and tries to terminate the main
streaming topic which writes to Google BigQuery from on-premise. It loops
for the last time


+------------------------------------+------+-------------------+------+

|rowkey                              |ticker|timeissued         |price |

+------------------------------------+------+-------------------+------+

|8dd9deb7-795d-465b-9b6b-900aff4b16d4|TSCO  |2021-04-22 19:22:09|367.7 |

|5a33a47f-5771-4659-9dda-cc60beb12c97|BP    |2021-04-22 19:22:09|433.8 |

|85dde126-f808-4738-a85a-c8f283e91896|MKS   |2021-04-22 19:22:09|347.55|

|38ee1e4a-9436-4b4a-bed0-ac75006756fc|SBRY  |2021-04-22 19:22:09|548.0 |

|8fae78bc-54b5-43d4-bf5f-0bdade02bec0|VOD   |2021-04-22 19:22:09|190.9 |

|f7b53ec9-35b1-4657-89f5-cb863cde6b4c|MSFT  |2021-04-22 19:22:09|29.08 |

|8ad7ea69-c576-4892-a420-c6627425be11|IBM   |2021-04-22 19:22:09|117.49|

|f5f2f0f1-83ef-4dde-a9cf-7e5306920e85|MRW   |2021-04-22 19:22:09|193.5 |

|33a3c1f1-3219-4c48-9ed6-8d62db355da2|ORCL  |2021-04-22 19:22:09|31.78 |

|7c61e7a8-d633-4911-b625-8a068daebb54|SAP   |2021-04-22 19:22:09|31.97 |

+------------------------------------+------+-------------------+------+


Terminating streaming processes

<pyspark.sql.streaming.StreamingQuery object at 0x7f5ad2b6e080>

and comes back with this error


2021-04-22 19:22:03,916 ERROR streaming.MicroBatchExecution: Query [id =
4f6469ad-b664-4e80-bac7-babc78c83c85, runId =
5dbd3e93-2bdc-4ff9-9fb2-df532318225d] terminated with error

java.io.IOException: Failed on local exception:
java.nio.channels.ClosedByInterruptException; Host Details : local host is:
"rhes76/50.140.197.230"; destination host is: "rhes75":9000;

I think the error is expected, because there is no other way to stop a
streaming job? In other words I expect that error to be there?

For clarification, in a normal operation status is set to 'true' so
business as usual with streaming

Let me know your thoughts

Thanks


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 22 Apr 2021 at 17:28, Lalwani, Jayesh <jlalw...@amazon.com> wrote:

> You never want to do System.exit from inside your forEachBatch method
> because that doesn’t cleanly terminate the query. You should use call stop
> on each query to terminate it safely
>
> You need something like this
>
> result = streamingDataFrame.select( \
>
>                      col("parsed_value.rowkey").alias("rowkey") \
>
>                    , col("parsed_value.ticker").alias("ticker") \
>
>                    , col("parsed_value.timeissued").alias("timeissued") \
>
>                    , col("parsed_value.price").alias("price")). \
>
>                      writeStream. \
>
>                      outputMode('append'). \
>
>                      option("truncate", "false"). \
>
>                      foreachBatch(*sendToSink*). \
>
>                      trigger(processingTime='30 seconds'). \
>
>
> newtopicResult = streamingNewtopic.select( \
>
>                      col("newtopic_value.uuid").alias("uuid") \
>
>                    ,
> col("newtopic_value.timecreated").alias("timecreated") \
>
>                    , col("newtopic_value.status").alias("status")). \
>
>                      writeStream. \
>
>                      outputMode('append'). \
>
>                      option("truncate", "false"). \
>
>                      foreachBatch(*sendToControl*). \
>
>                      trigger(processingTime='2 seconds'). \
>
>                      start()
>
> spark.streams.awaitAnyTermination()
>
>
> def sendToControl(dfnewtopic, batchId):
>
>     if(len(dfnewtopic.take(1))) > 0:
>
>         print(f"""newtopic batchId is {batchId}""")
>
>         dfnewtopic.show(100,False)
>
>         *spark.streams.active.forEach(_.stop)*
>
>     else:
>
>         print("DataFrame newtopic is empty")
>
>
>
>
>
> *From: *Mich Talebzadeh <mich.talebza...@gmail.com>
> *Date: *Thursday, April 22, 2021 at 12:10 PM
> *To: *"Lalwani, Jayesh" <jlalw...@amazon.com>
> *Cc: *"user @spark" <user@spark.apache.org>
> *Subject: *RE: [EXTERNAL] Dealing with two topics in Spark Structured
> Streaming
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi,
>
>
>
> Basically reading two streaming topics in Spark structured streaming.
>
>
>
> One topic called newtopic has this column status which could be
> either "true" or "false". It is effectively a controlling topic to make the
> main topic called md (market data) exit.
>
>
>
> I am experimenting with this. So topic md is streaming DAG (sequence of
> transformation on the topic) and the topic newtopic is the controlling DAG.
> I trust this makes sense.
>
>
>
> So here ideally I want to perform the following
>
>
>
> def sendToSink(df, batchId):
>
>     if(len(df.take(1))) > 0:
>
>         print(f"""md batchId is {batchId}""")
>
>         df.show(100,False)
>
>         df. persist()
>
>         # write to BigQuery batch table
>
>         s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
>         df.unpersist()
>
>         ## pseudo code
>
>         if status == "true"   ## I need to know this value coming from
> newtopic, and test it immediately after writing the message to BigQuery
>
>           terminate the process and exit from the Spark program for
> whatever reason. May need to refine this exit as well
>
>           System.exit(0)
>
>     else:
>
>         print("DataFrame md is empty")
>
>
>
> As I stated, I am experimenting with this.
>
>
>
> Thanks
>
>
>
>
>
>
>  [image: Image removed by sender.]  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 22 Apr 2021 at 16:53, Lalwani, Jayesh <jlalw...@amazon.com> wrote:
>
> What are you trying to do? Can you give us a bigger picture?
>
>
>
> *From: *Mich Talebzadeh <mich.talebza...@gmail.com>
> *Date: *Thursday, April 22, 2021 at 11:43 AM
> *To: *"user @spark" <user@spark.apache.org>
> *Subject: *RE: [EXTERNAL] Dealing with two topics in Spark Structured
> Streaming
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi,
>
>
>
> I need some ideas on this actually. These two functions return respective
> data frames in spark structured streaming. THey are as a result
> of foreachBatch()
>
>
>
> I just need to be able to access the value of* status* worked out
> in sendToControl() in sendToSink() method
>
>
>
>
>
> def sendToControl(dfnewtopic, batchId):
>
>     if(len(dfnewtopic.take(1))) > 0:
>
>         print(f"""newtopic batchId is {batchId}""")
>
>         dfnewtopic.show(100,False)
>
>         *status = dfnewtopic.select(col("status")).collect()[0][0]*
>
>     else:
>
>         print("DataFrame newtopic is empty")
>
>
>
> # need to access that *status* in below method
>
> def sendToSink(df, batchId):
>
>     if(len(df.take(1))) > 0:
>
>         print(f"""md batchId is {batchId}""")
>
>         df.show(100,False)
>
>         df. persist()
>
>         # write to BigQuery batch table
>
>         s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
>         df.unpersist()
>
>     else:
>
>         print("DataFrame md is empty")
>
>
>
> I know using global variable etc is not going to be viable. How can this
> be achieved? At anytime the value of status is either true or false
>
>
>
> Thanks
>
>
>
>
>
>  *Error! Filename not specified.*  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 22 Apr 2021 at 10:29, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>
> Hi,
>
>
>
> I am dealing with two topics in SSS.
>
>
>
> Topic "md" returns market data values as per
>
>
>
>             result = streamingDataFrame.select( \
>
>                      col("parsed_value.rowkey").alias("rowkey") \
>
>                    , col("parsed_value.ticker").alias("ticker") \
>
>                    , col("parsed_value.timeissued").alias("timeissued") \
>
>                    , col("parsed_value.price").alias("price")). \
>
>                      writeStream. \
>
>                      outputMode('append'). \
>
>                      option("truncate", "false"). \
>
>                      foreachBatch(*sendToSink*). \
>
>                      trigger(processingTime='30 seconds'). \
>
>
>
> Topic "newtopic" returns one row including status column
>
>
>
>            newtopicResult = streamingNewtopic.select( \
>
>                      col("newtopic_value.uuid").alias("uuid") \
>
>                    ,
> col("newtopic_value.timecreated").alias("timecreated") \
>
>                    , col("newtopic_value.status").alias("status")). \
>
>                      writeStream. \
>
>                      outputMode('append'). \
>
>                      option("truncate", "false"). \
>
>                      foreachBatch(*sendToControl*). \
>
>                      trigger(processingTime='2 seconds'). \
>
>                      start()
>
>
>
> The method sendToSink writes values to a database BigQuery
>
>
>
> def sendToSink(df, batchId):
>
>     if(len(df.take(1))) > 0:
>
>         print(f"""{batchId}""")
>
>         df.show(100)
>
>         df. persist()
>
>         # write to BigQuery batch table
>
>         s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
>         df.unpersist()
>
>     else:
>
>         print("DataFrame is empty")
>
>
>
> The second method simply gets value for status column
>
>
>
> def sendToControl(dfnewtopic, batchId):
>
>     if(len(dfnewtopic.take(1))) > 0:
>
>         print(f"""newtopic batchId is {batchId}""")
>
>        * status = dfnewtopic.select(col("status")).collect()[0][0]*
>
>     else:
>
>         print("DataFrame newtopic is empty")
>
>
>
> I need to be able to check the status value from the sendToControl method
> in the sendToSink method. Note that status values may be changing as per
> streaming. What would be the best approach?
>
>
>
> Thanks
>
>
>
>
>
>  *Error! Filename not specified.*  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>

Reply via email to