Thanks, Mich .. will check it out

regds,
Karan Alang

On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> BTW you can check this Linkedin article of mine on Processing Change Data
> Capture with Spark Structured Streaming
> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>
>
>
> It covers the concept of triggers including trigger(once = True) or
> one-time batch in Spark Structured Streaming
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 7 Feb 2022 at 23:06, karan alang <karan.al...@gmail.com> wrote:
>
>> Thanks, Mich .. that worked fine!
>>
>>
>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> read below
>>>
>>>             """
>>>                "foreach" performs custom write logic on each row and
>>> "foreachBatch" performs custom write logic on each micro-batch through
>>> SendToBigQuery function
>>>                 *foreachBatch(SendToBigQuery) expects 2 parameters,
>>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>>> batch --> batchId*
>>>                Using foreachBatch, we write each micro batch to storage
>>> defined in our custom logic. In this case, we store the output of our
>>> streaming application to Google BigQuery table.
>>>                Note that we are appending data and column "rowkey" is
>>> defined as UUID so it can be used as the primary key
>>>             """
>>>             result = streamingDataFrame.select( \
>>>                      col("parsed_value.rowkey").alias("rowkey") \
>>>                    , col("parsed_value.ticker").alias("ticker") \
>>>                    , col("parsed_value.timeissued").alias("timeissued") \
>>>                    , col("parsed_value.price").alias("price")). \
>>>                      writeStream. \
>>>                      outputMode('append'). \
>>>                      option("truncate", "false"). \
>>>                      *foreachBatch(SendToBigQuery)*. \
>>>                      trigger(processingTime='2 seconds'). \
>>>                      start()
>>>
>>> now you define your function *SendToBigQuery() *
>>>
>>>
>>> *def SendToBigQuery(df, batchId):*
>>>
>>>     if(len(df.take(1))) > 0:
>>>
>>>         df.printSchema()
>>>
>>>         print(f"""batchId is {batchId}""")
>>>
>>>         rows = df.count()
>>>
>>>         print(f""" Total records processed in this run = {rows}""")
>>>
>>>         ......
>>>
>>>     else:
>>>
>>>         print("DataFrame is empty")
>>>
>>> *HTH*
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 7 Feb 2022 at 21:06, karan alang <karan.al...@gmail.com> wrote:
>>>
>>>> Hello All,
>>>>
>>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>>> transformation on each individual row.
>>>>
>>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>>>> Basic question - how is the row passed to the function when foreach is
>>>> used ?
>>>>
>>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>>> function called ? How do I access individual rows ?
>>>>
>>>> Details are in stackoverflow :
>>>>
>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>>
>>>> What is the best approach for this use-case ?
>>>>
>>>> tia!
>>>>
>>>

Reply via email to