Thanks, Mich .. will check it out regds, Karan Alang
On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > BTW you can check this Linkedin article of mine on Processing Change Data > Capture with Spark Structured Streaming > <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D> > > > It covers the concept of triggers including trigger(once = True) or > one-time batch in Spark Structured Streaming > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 7 Feb 2022 at 23:06, karan alang <karan.al...@gmail.com> wrote: > >> Thanks, Mich .. that worked fine! >> >> >> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> read below >>> >>> """ >>> "foreach" performs custom write logic on each row and >>> "foreachBatch" performs custom write logic on each micro-batch through >>> SendToBigQuery function >>> *foreachBatch(SendToBigQuery) expects 2 parameters, >>> first: micro-batch as DataFrame or Dataset and second: unique id for each >>> batch --> batchId* >>> Using foreachBatch, we write each micro batch to storage >>> defined in our custom logic. In this case, we store the output of our >>> streaming application to Google BigQuery table. >>> Note that we are appending data and column "rowkey" is >>> defined as UUID so it can be used as the primary key >>> """ >>> result = streamingDataFrame.select( \ >>> col("parsed_value.rowkey").alias("rowkey") \ >>> , col("parsed_value.ticker").alias("ticker") \ >>> , col("parsed_value.timeissued").alias("timeissued") \ >>> , col("parsed_value.price").alias("price")). \ >>> writeStream. \ >>> outputMode('append'). \ >>> option("truncate", "false"). \ >>> *foreachBatch(SendToBigQuery)*. \ >>> trigger(processingTime='2 seconds'). \ >>> start() >>> >>> now you define your function *SendToBigQuery() * >>> >>> >>> *def SendToBigQuery(df, batchId):* >>> >>> if(len(df.take(1))) > 0: >>> >>> df.printSchema() >>> >>> print(f"""batchId is {batchId}""") >>> >>> rows = df.count() >>> >>> print(f""" Total records processed in this run = {rows}""") >>> >>> ...... >>> >>> else: >>> >>> print("DataFrame is empty") >>> >>> *HTH* >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Mon, 7 Feb 2022 at 21:06, karan alang <karan.al...@gmail.com> wrote: >>> >>>> Hello All, >>>> >>>> I'm using StructuredStreaming to read data from Kafka, and need to do >>>> transformation on each individual row. >>>> >>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues. >>>> Basic question - how is the row passed to the function when foreach is >>>> used ? >>>> >>>> Also, when I use foreachBatch, seems the BatchId is available in the >>>> function called ? How do I access individual rows ? >>>> >>>> Details are in stackoverflow : >>>> >>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working >>>> >>>> What is the best approach for this use-case ? >>>> >>>> tia! >>>> >>>