Hi Giuseppe,

Just looked over your PySpark code. You are doing Spark Structured
Streaming (SSS)

Your kafka topic sends messages every two seconds and regardless you want
to enrich the data every 5 minutes. In other words weait for 5 minutes to
build the batch.

You can either run wait for 5 minutes to trigger calculation

              result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     *foreachBatch(SendToBigQuery). \   ## you do your
temperature here in this method *
  *                   trigger(processingTime='300 seconds'). \*
                     start()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)
        result.awaitTermination()

Or conversely run a SSS job every 5 minutes (through cron or airflow etc)
to process the data once (


                    trigger(once = True). \

                   option('checkpointLocation', checkpoint_path). \



And terminate the process.


Check this article of mine in Linkedin


Processing Change Data Capture with Spark Structured Streaming | LinkedIn
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=YTSGjnuSWGqgGbToJ5TQkw%3D%3D>


It also elaborates about the role of triggers.



HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 10 May 2021 at 16:19, Giuseppe Ricci <peppepega...@gmail.com> wrote:

> Hi, I'm new on Apache Spark.
> I'm trying to read data from an Apache Kafka topic (I have a simulated
> temperature sensor producer which sends data every 2 second) and I need
> every 5 minutes to calculate the average temperature. Reading documentation
> I understand I need to use windows but I'm not able to finalize my code.
> Can some help me?
> How can I launch batches every 5 minutes? My code works one time and
> finishes. Why in the console I can't find any helpful information for
> correct execution? See attached picture.
>
> This is my code:
> https://pastebin.com/4S31jEeP
>
> Thanks for your precious help.
>
>
>
> PhD. Giuseppe Ricci
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to