read below

            """
               "foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
SendToBigQuery function
                *foreachBatch(SendToBigQuery) expects 2 parameters, first:
micro-batch as DataFrame or Dataset and second: unique id for each batch
--> batchId*
               Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table.
               Note that we are appending data and column "rowkey" is
defined as UUID so it can be used as the primary key
            """
            result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     *foreachBatch(SendToBigQuery)*. \
                     trigger(processingTime='2 seconds'). \
                     start()

now you define your function *SendToBigQuery() *


*def SendToBigQuery(df, batchId):*

    if(len(df.take(1))) > 0:

        df.printSchema()

        print(f"""batchId is {batchId}""")

        rows = df.count()

        print(f""" Total records processed in this run = {rows}""")

        ......

    else:

        print("DataFrame is empty")

*HTH*


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 7 Feb 2022 at 21:06, karan alang <karan.al...@gmail.com> wrote:

> Hello All,
>
> I'm using StructuredStreaming to read data from Kafka, and need to do
> transformation on each individual row.
>
> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
> Basic question - how is the row passed to the function when foreach is
> used ?
>
> Also, when I use foreachBatch, seems the BatchId is available in the
> function called ? How do I access individual rows ?
>
> Details are in stackoverflow :
>
> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>
> What is the best approach for this use-case ?
>
> tia!
>

Reply via email to