I don't quite get it - aren't you applying to the same stream, and batches?
worst case why not apply these as one function?
Otherwise, how do you mean to associate one call to another?
globals don't help here. They aren't global beyond the driver, and, which
one would be which batch?

On Sat, Mar 4, 2023 at 3:02 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Thanks. they are different batchIds
>
> From sendToControl, newtopic batchId is 76
> From sendToSink, md, batchId is 563
>
> As a matter of interest, why does a global variable not work?
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 4 Mar 2023 at 20:13, Sean Owen <sro...@gmail.com> wrote:
>
>> It's the same batch ID already, no?
>> Or why not simply put the logic of both in one function? or write one
>> function that calls both?
>>
>> On Sat, Mar 4, 2023 at 2:07 PM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>>
>>> This is probably pretty  straight forward but somehow is does not look
>>> that way
>>>
>>>
>>>
>>> On Spark Structured Streaming,  "foreachBatch" performs custom write
>>> logic on each micro-batch through a call function. Example,
>>>
>>> foreachBatch(sendToSink) expects 2 parameters, first: micro-batch as
>>> DataFrame or Dataset and second: unique id for each batch
>>>
>>>
>>>
>>> In my case I simultaneously read two topics through two separate
>>> functions
>>>
>>>
>>>
>>>    1. foreachBatch(sendToSink). \
>>>    2. foreachBatch(sendToControl). \
>>>
>>> This is  the code
>>>
>>> def sendToSink(df, batchId):
>>>     if(len(df.take(1))) > 0:
>>>         print(f"""From sendToSink, md, batchId is {batchId}, at
>>> {datetime.now()} """)
>>>         #df.show(100,False)
>>>         df. persist()
>>>         # write to BigQuery batch table
>>>         #s.writeTableToBQ(df, "append",
>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>>         df.unpersist()
>>>         #print(f"""wrote to DB""")
>>>        else:
>>>         print("DataFrame md is empty")
>>>
>>> def sendToControl(dfnewtopic, batchId2):
>>>     if(len(dfnewtopic.take(1))) > 0:
>>>         print(f"""From sendToControl, newtopic batchId is {batchId2}""")
>>>         dfnewtopic.show(100,False)
>>>         queue = dfnewtopic.first()[2]
>>>         status = dfnewtopic.first()[3]
>>>         print(f"""testing queue is {queue}, and status is {status}""")
>>>         if((queue == config['MDVariables']['topic']) & (status ==
>>> 'false')):
>>>           spark_session = s.spark_session(config['common']['appName'])
>>>           active = spark_session.streams.active
>>>           for e in active:
>>>              name = e.name
>>>              if(name == config['MDVariables']['topic']):
>>>                 print(f"""\n==> Request terminating streaming process
>>> for topic {name} at {datetime.now()}\n """)
>>>                 e.stop()
>>>     else:
>>>         print("DataFrame newtopic is empty")
>>>
>>>
>>> The problem I have is to share batchID from the first function in the
>>> second function sendToControl(dfnewtopic, batchId2) so I can print it
>>> out.
>>>
>>>
>>> Defining a global did not work.. So it sounds like I am missing
>>> something rudimentary here!
>>>
>>>
>>> Thanks
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>

Reply via email to