Hi,

I am dealing with two topics in SSS.


Topic "md" returns market data values as per


            result = streamingDataFrame.select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     writeStream. \

                     outputMode('append'). \

                     option("truncate", "false"). \

                     foreachBatch(*sendToSink*). \

                     trigger(processingTime='30 seconds'). \

Topic "newtopic" returns one row including status column

           newtopicResult = streamingNewtopic.select( \
                     col("newtopic_value.uuid").alias("uuid") \
                   , col("newtopic_value.timecreated").alias("timecreated")
\
                   , col("newtopic_value.status").alias("status")). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(*sendToControl*). \
                     trigger(processingTime='2 seconds'). \
                     start()

The method sendToSink writes values to a database BigQuery

def sendToSink(df, batchId):
    if(len(df.take(1))) > 0:
        print(f"""{batchId}""")
        df.show(100)
        df. persist()
        # write to BigQuery batch table
        s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
        df.unpersist()
    else:
        print("DataFrame is empty")

The second method simply gets value for status column

def sendToControl(dfnewtopic, batchId):
    if(len(dfnewtopic.take(1))) > 0:
        print(f"""newtopic batchId is {batchId}""")
       * status = dfnewtopic.select(col("status")).collect()[0][0]*
    else:
        print("DataFrame newtopic is empty")

I need to be able to check the status value from the sendToControl method
in the sendToSink method. Note that status values may be changing as per
streaming. What would be the best approach?

Thanks


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to