Hi Giuseppe, Just looked over your PySpark code. You are doing Spark Structured Streaming (SSS)
Your kafka topic sends messages every two seconds and regardless you want to enrich the data every 5 minutes. In other words weait for 5 minutes to build the batch. You can either run wait for 5 minutes to trigger calculation result = streamingDataFrame.select( \ col("parsed_value.rowkey").alias("rowkey") \ , col("parsed_value.ticker").alias("ticker") \ , col("parsed_value.timeissued").alias("timeissued") \ , col("parsed_value.price").alias("price")). \ writeStream. \ outputMode('append'). \ option("truncate", "false"). \ *foreachBatch(SendToBigQuery). \ ## you do your temperature here in this method * * trigger(processingTime='300 seconds'). \* start() except Exception as e: print(f"""{e}, quitting""") sys.exit(1) result.awaitTermination() Or conversely run a SSS job every 5 minutes (through cron or airflow etc) to process the data once ( trigger(once = True). \ option('checkpointLocation', checkpoint_path). \ And terminate the process. Check this article of mine in Linkedin Processing Change Data Capture with Spark Structured Streaming | LinkedIn <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=YTSGjnuSWGqgGbToJ5TQkw%3D%3D> It also elaborates about the role of triggers. HTH view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 10 May 2021 at 16:19, Giuseppe Ricci <peppepega...@gmail.com> wrote: > Hi, I'm new on Apache Spark. > I'm trying to read data from an Apache Kafka topic (I have a simulated > temperature sensor producer which sends data every 2 second) and I need > every 5 minutes to calculate the average temperature. Reading documentation > I understand I need to use windows but I'm not able to finalize my code. > Can some help me? > How can I launch batches every 5 minutes? My code works one time and > finishes. Why in the console I can't find any helpful information for > correct execution? See attached picture. > > This is my code: > https://pastebin.com/4S31jEeP > > Thanks for your precious help. > > > > PhD. Giuseppe Ricci > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org