BTW you can check this Linkedin article of mine on Processing Change Data
Capture with Spark Structured Streaming
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>


It covers the concept of triggers including trigger(once = True) or
one-time batch in Spark Structured Streaming


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 7 Feb 2022 at 23:06, karan alang <karan.al...@gmail.com> wrote:

> Thanks, Mich .. that worked fine!
>
>
> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> read below
>>
>>             """
>>                "foreach" performs custom write logic on each row and
>> "foreachBatch" performs custom write logic on each micro-batch through
>> SendToBigQuery function
>>                 *foreachBatch(SendToBigQuery) expects 2 parameters,
>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>> batch --> batchId*
>>                Using foreachBatch, we write each micro batch to storage
>> defined in our custom logic. In this case, we store the output of our
>> streaming application to Google BigQuery table.
>>                Note that we are appending data and column "rowkey" is
>> defined as UUID so it can be used as the primary key
>>             """
>>             result = streamingDataFrame.select( \
>>                      col("parsed_value.rowkey").alias("rowkey") \
>>                    , col("parsed_value.ticker").alias("ticker") \
>>                    , col("parsed_value.timeissued").alias("timeissued") \
>>                    , col("parsed_value.price").alias("price")). \
>>                      writeStream. \
>>                      outputMode('append'). \
>>                      option("truncate", "false"). \
>>                      *foreachBatch(SendToBigQuery)*. \
>>                      trigger(processingTime='2 seconds'). \
>>                      start()
>>
>> now you define your function *SendToBigQuery() *
>>
>>
>> *def SendToBigQuery(df, batchId):*
>>
>>     if(len(df.take(1))) > 0:
>>
>>         df.printSchema()
>>
>>         print(f"""batchId is {batchId}""")
>>
>>         rows = df.count()
>>
>>         print(f""" Total records processed in this run = {rows}""")
>>
>>         ......
>>
>>     else:
>>
>>         print("DataFrame is empty")
>>
>> *HTH*
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 7 Feb 2022 at 21:06, karan alang <karan.al...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>> transformation on each individual row.
>>>
>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>>> Basic question - how is the row passed to the function when foreach is
>>> used ?
>>>
>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>> function called ? How do I access individual rows ?
>>>
>>> Details are in stackoverflow :
>>>
>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>
>>> What is the best approach for this use-case ?
>>>
>>> tia!
>>>
>>

Reply via email to