OK where is your watermark created? That is the  one that works out the
average temperature!

           # construct a streaming dataframe streamingDataFrame that
subscribes to topic temperature
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", "temperature") \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


            resultC = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \   ## you
do not need this
                   , col("parsed_value.timestamp").alias("timestamp") \
                   , col("parsed_value.temperature").alias("temperature"))

                     """
            We work out the window and the AVG(temperature) in the window's
timeframe below
            This should return back the following Dataframe as struct

             root
             |-- window: struct (nullable = false)
             |    |-- start: timestamp (nullable = true)
             |    |-- end: timestamp (nullable = true)
             |-- avg(temperature): double (nullable = true)
            """

           resultM = resultC. \
                     withWatermark("timestamp", "5 minutes"). \
                     groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
                     avg('temperature')

           # We take the above Dataframe resultM and flatten it to get the
columns aliased as "startOfWindowFrame", "endOfWindowFrame" and "
AVGTemperature"
            resultMF = resultM. \
                       select( \

F.col("window.start").alias("startOfWindowFrame") \
                          , F.col("window.end").alias("endOfWindowFrame") \
                          ,
F.col("avg(temperature)").alias("AVGTemperature"))

            resultMF.printSchema()

           result = resultMF. \
                     writeStream. \
                     outputMode('complete'). \
                     option("numRows", 1000). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("temperature"). \
                     start()

        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

          result.awaitTermination()

This should work and return back average values for temperature between
start and end

Sample output

root
 |-- startOfWindowFrame: timestamp (nullable = true)
 |-- endOfWindowFrame: timestamp (nullable = true)
 |-- AVGTemperature: double (nullable = true)

-------------------------------------------
Batch: 15
-------------------------------------------
+-------------------+-------------------+------------------+
|startOfWindowFrame |endOfWindowFrame   |AVGTemperature    |
+-------------------+-------------------+------------------+
|2021-05-17 19:35:00|2021-05-17 19:40:00|24.8              |
|2021-05-17 19:45:00|2021-05-17 19:50:00|27.0              |
|2021-05-17 20:25:00|2021-05-17 20:30:00|24.4              |
|2021-05-17 20:20:00|2021-05-17 20:25:00|25.4              |
|2021-05-17 19:25:00|2021-05-17 19:30:00|24.25             |
|2021-05-17 19:55:00|2021-05-17 20:00:00|23.5              |
|2021-05-21 15:30:00|2021-05-21 15:35:00|23.0              |
|2021-05-17 19:50:00|2021-05-17 19:55:00|25.0              |
|2021-05-17 20:30:00|2021-05-17 20:35:00|25.8              |
|2021-05-17 20:10:00|2021-05-17 20:15:00|25.25             |
|2021-05-17 19:30:00|2021-05-17 19:35:00|27.0              |
|2021-05-17 20:15:00|2021-05-17 20:20:00|23.8              |
|2021-05-17 20:00:00|2021-05-17 20:05:00|24.666666666666668|
|2021-05-17 19:40:00|2021-05-17 19:45:00|25.5              |
|2021-05-17 20:05:00|2021-05-17 20:10:00|26.4              |
+-------------------+-------------------+------------------+


HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 21 May 2021 at 11:48, Giuseppe Ricci <peppepega...@gmail.com> wrote:

>
> Hi Mich,
>
> thank you for your help..So this is my Spark script:
>
> # Spark session & context
> spark = (SparkSession
>          .builder
>          .master('local')
>          .appName('TemperatureStreamApp')
>          # Add kafka package
>          .config("spark.jars.packages",
> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
>          .getOrCreate())
>
> schema = StructType().add("timestamp", TimestampType()).add("temperature",
> IntegerType())
>
> streamingDataFrame = spark \
>     .readStream \
>     .format("kafka") \
>     .option("kafka.bootstrap.servers", "localhost:9092") \
>     .option("subscribe", "temperature") \
>     .option("failOnDataLoss", "false") \
>     .option("includeHeaders", "true") \
>     .option("startingOffsets", "latest") \
>     .load() \
>     .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
> # get columns from struct
> resultM = streamingDataFrame.select( \
>          col("parsed_value.timestamp").alias("timestamp"), \
>          col("parsed_value.temperature").alias("temperature"))
>
>
> resultM.printSchema() #struc with 2 columns
>
> resultMF = resultM. \
>            select( \
>                 F.col("window.start").alias("startOfWindow") \
>               , F.col("window.end").alias("endOfWindow") \
>               , F.col("avg(temperature)").alias("AVGTemperature"))
> uuidUdf = F.udf(lambda: str(uuid.uuid4()), StringType())
>
> resultK = resultMF.withColumn("uuid", uuidUdf()) \
>     .selectExpr("CAST(uuid AS STRING) AS key",
> "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \
>     .writeStream \
>     .outputMode('complete') \
>     .format("kafka") \
>     .option("kafka.bootstrap.servers", "localhost:9092", ) \
>     .option("topic", "avgtemperature") \
>     .option('checkpointLocation',
> "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream")
> \
>     .queryName("avgtemperature") \
>     .start()
>
> according to your last suggestion..but my structure has only two columns:
> timestamp e temperature..In this manner your script doesn't work because
> you suppose to have the windows start and end columns.
> How can I have a similar structure as in your code?
> Thanks.
>
>
> PhD. Giuseppe Ricci
>
>
>
> Il giorno mar 18 mag 2021 alle ore 16:58 Mich Talebzadeh <
> mich.talebza...@gmail.com> ha scritto:
>
>> something like below:
>>
>>              root
>>              |-- window: struct (nullable = false)
>>              |    |-- start: timestamp (nullable = true)
>>              |    |-- end: timestamp (nullable = true)
>>              |-- avg(temperature): double (nullable = true)
>>
>>        import pyspark.sql.functions as F
>>        import uuid
>>             # We take the above DataFrame and flatten it to get the
>> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
>> "AVGTemperature"
>>             resultMF = resultM. \
>>                        select( \
>>                             F.col("window.start").alias("startOfWindow") \
>>                           , F.col("window.end").alias("endOfWindow") \
>>                           ,
>> F.col("avg(temperature)").alias("AVGTemperature"))
>>
>>       # Kafka producer requires a key, value pair. We generate UUID key
>> as the unique identifier of Kafka record
>>         uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())
>>
>>             """
>>             You are using Spark to write a Kafka topic
>> called avgtemperature, using Spark as a Kafka Producer
>>             We take the DataFrame resultMF containing temperature info
>> and write it to Kafka. The uuid is serialized as a string and used as the
>> key.
>>             We take all the columns of the DataFrame and serialize them
>> as a JSON string, putting the results in the "value" of the record.
>>             """
>>            result = resultMF.withColumn("uuid",uuidUdf()) \
>>                      .selectExpr("CAST(uuid AS STRING) AS key",
>> "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \
>>                      .writeStream \
>>                      .outputMode('complete') \
>>                      .format("kafka") \
>>                      .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>>                      .option("topic", "avgtemperature") \
>>                      .option('checkpointLocation', checkpoint_path) \
>>                      .queryName("avgtemperature") \
>>                      .start()
>>
>> This should work
>>
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 18 May 2021 at 14:25, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Ok let me provide some suggestions here.
>>>
>>> ResultM is a data frame and if you do
>>>
>>> ResultM.printShema()
>>>
>>> You will get the struct column called window with two columns namely
>>> start and end plus the average temperature. Just try to confirm that now
>>>
>>> HTH,
>>>
>>> Much
>>>
>>> On Tue, 18 May 2021 at 14:15, Giuseppe Ricci <peppepega...@gmail.com>
>>> wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> Yes I set resultM as you suggested in your previous code.
>>>> schema = StructType().add("timestamp",
>>>> TimestampType()).add("temperature", IntegerType())
>>>>
>>>> streamingDataFrame = spark \
>>>>     .readStream \
>>>>     .format("kafka") \
>>>>     .option("kafka.bootstrap.servers", "localhost:9092") \
>>>>     .option("subscribe", "temperature") \
>>>>     .option("failOnDataLoss", "false") \
>>>>     .option("includeHeaders", "true") \
>>>>     .option("startingOffsets", "latest") \
>>>>     .load() \
>>>>     .select(from_json(col("value").cast("string"),
>>>> schema).alias("parsed_value"))
>>>>
>>>> # get columns from struct
>>>> resultM = streamingDataFrame.select( \
>>>>          col("parsed_value.timestamp").alias("timestamp"), \
>>>>          col("parsed_value.temperature").alias("temperature"))
>>>>
>>>> result = (resultM. \
>>>>      withWatermark("timestamp", "1 minutes"). \
>>>>      groupBy(window(resultM.timestamp, "1 minutes", "1 minutes")). \
>>>>      avg('temperature'). \
>>>>      writeStream. \
>>>>      outputMode('complete'). \
>>>>      option("numRows", 100). \
>>>>      option("truncate", "false").
>>>>      option('checkpointLocation',
>>>> "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream").
>>>> \
>>>>      format('console'). \
>>>>      queryName("temperature"). \
>>>>      start())
>>>>
>>>>
>>>> qk = (resultM.
>>>>       selectExpr("CAST(timestamp AS STRING)", "CAST(temperature AS
>>>> STRING)") \
>>>>       .writeStream \
>>>>       .format("kafka") \
>>>>       .option("kafka.bootstrap.servers", "localhost:9092") \
>>>>       .option('checkpointLocation',
>>>> "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream")
>>>> \
>>>>       .option("topic", "avgtemperature") \
>>>>       .start())
>>>>
>>>> KR
>>>> g
>>>>
>>>>
>>>> PhD. Giuseppe Ricci
>>>>
>>>>
>>>>
>>>> Il giorno lun 17 mag 2021 alle ore 19:01 Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> ha scritto:
>>>>
>>>>> Hi Giuseppe ,
>>>>>
>>>>> How have you defined your resultM above in qK?
>>>>>
>>>>> Cheers
>>>>>
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 17 May 2021 at 17:18, Giuseppe Ricci <peppepega...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Mitch,
>>>>>>
>>>>>> thanks for your extraordinary support.
>>>>>> Your previous code worked well...but I received the error in my past
>>>>>> mail for Kafka writing average temperature on the topic avgtemperature.
>>>>>> There is some error in this code:
>>>>>>
>>>>>> qk = (resultM.
>>>>>>       selectExpr("CAST(timestamp AS STRING)", "CAST(temperature AS
>>>>>> STRING)") \
>>>>>>       .writeStream \
>>>>>>       .format("kafka") \
>>>>>>       .option("kafka.bootstrap.servers", "localhost:9092") \
>>>>>>       .option('checkpointLocation',
>>>>>> "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream")
>>>>>> \
>>>>>>       .option("topic", "avgtemperature") \
>>>>>>       .start())
>>>>>>
>>>>>> I hope it is clear.
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>>
>>>>>> PhD. Giuseppe Ricci
>>>>>>
>>>>>>
>>>>>>
>>>>>> Il giorno lun 17 mag 2021 alle ore 16:33 Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> ha scritto:
>>>>>>
>>>>>>> Hi Giuseppe,
>>>>>>>
>>>>>>> Your error state --> Required attribute 'value' not found
>>>>>>>
>>>>>>> First can you read your streaming data OK?
>>>>>>>
>>>>>>> Here in my stream in data format in json. I have three columns in
>>>>>>> json format
>>>>>>>
>>>>>>> example:
>>>>>>>
>>>>>>> {"rowkey":"f0577406-a7d3-4c52-9998-63835ea72133",
>>>>>>> "timestamp":"2021-05-17T15:17:27", "temperature":27}
>>>>>>>
>>>>>>> The first column is UUID, the second is timestamp and third is
>>>>>>> temperature.
>>>>>>>
>>>>>>> I need to tell SSS how the columns are formatted
>>>>>>>
>>>>>>> I define the schema as follows:
>>>>>>>
>>>>>>>              schema = StructType().add("rowkey",
>>>>>>> StringType()).add("timestamp", TimestampType()).add("temperature",
>>>>>>> IntegerType())
>>>>>>>        checkpoint_path = "file:///ssd/hduser/temperature2/chkpt"
>>>>>>>         try:
>>>>>>>
>>>>>>>             # construct a streaming dataframe streamingDataFrame
>>>>>>> that subscribes to topic temperature
>>>>>>>             streamingDataFrame = self.spark \
>>>>>>>                 .readStream \
>>>>>>>                 .format("kafka") \
>>>>>>>                 .option("kafka.bootstrap.servers",
>>>>>>> config['MDVariables']['bootstrapServers'],) \
>>>>>>>                 .option("schema.registry.url",
>>>>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>>>>                 .option("group.id", config['common']['appName']) \
>>>>>>>                 .option("zookeeper.connection.timeout.ms",
>>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>>>>>                 .option("rebalance.backoff.ms",
>>>>>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>>>>>                 .option("zookeeper.session.timeout.ms",
>>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>>>>>                 .option("auto.commit.interval.ms",
>>>>>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>>>>>                 .option("subscribe", "temperature") \
>>>>>>>                 .option("failOnDataLoss", "false") \
>>>>>>>                 .option("includeHeaders", "true") \
>>>>>>>                 .option("startingOffsets", "latest") \
>>>>>>>                 .load() \
>>>>>>>                 *.select(from_json(col("value").cast("string"),
>>>>>>> schema).alias("parsed_value"))  ## note the value here*
>>>>>>>
>>>>>>>             ## get the individual columns from schema
>>>>>>>             resultM = streamingDataFrame.select( \
>>>>>>>                      col("parsed_value.rowkey").alias("rowkey") \
>>>>>>>                    ,
>>>>>>> col("parsed_value.timestamp").alias("timestamp") \
>>>>>>>                    ,
>>>>>>> col("parsed_value.temperature").alias("temperature"))
>>>>>>>
>>>>>>>             ## Here I do my windowing and tell that I am interested
>>>>>>> in avg("temperature") over timestamp
>>>>>>>
>>>>>>>             result = resultM. \
>>>>>>>                      withWatermark("timestamp", "5 minutes"). \
>>>>>>>                      groupBy(window(resultM.timestamp, "5 minutes",
>>>>>>> "5 minutes")). \
>>>>>>>                      avg('temperature'). \
>>>>>>>                      writeStream. \
>>>>>>>                      outputMode('complete'). \
>>>>>>>                      option("numRows", 1000). \
>>>>>>>                      option("truncate", "false"). \
>>>>>>>                      format('console'). \
>>>>>>>                      option('checkpointLocation', checkpoint_path). \
>>>>>>>                      queryName("temperature"). \
>>>>>>>                      start()
>>>>>>>
>>>>>>>         except Exception as e:
>>>>>>>                 print(f"""{e}, quitting""")
>>>>>>>                 sys.exit(1)
>>>>>>>
>>>>>>>         #print(result.status)
>>>>>>>         #print(result.recentProgress)
>>>>>>>         #print(result.lastProgress)
>>>>>>>
>>>>>>>         result.awaitTermination()
>>>>>>>
>>>>>>>  This works. I attach the py code for you. Have a look at it
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, 17 May 2021 at 15:00, Giuseppe Ricci <peppepega...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Mich, Hi all,
>>>>>>>>
>>>>>>>> Thank you for your precious support..it seems your solution worked!
>>>>>>>>
>>>>>>>> 21/05/17 15:53:38 WARN HDFSBackedStateStoreProvider: The state for
>>>>>>>> version 83 doesn't exist in loadedMaps. Reading snapshot file and delta
>>>>>>>> files if needed...Note that this is normal for the first batch of 
>>>>>>>> starting
>>>>>>>> query.
>>>>>>>> -------------------------------------------
>>>>>>>> Batch: 83
>>>>>>>> -------------------------------------------
>>>>>>>> +------------------------------------------+------------------+
>>>>>>>> |window                                    |avg(temperature)  |
>>>>>>>> +------------------------------------------+------------------+
>>>>>>>> |{2021-05-13 15:02:30, 2021-05-13 15:02:40}|11.90999984741211 |
>>>>>>>> |{2021-05-14 16:04:20, 2021-05-14 16:04:30}|12.859999656677246|
>>>>>>>> |{2021-05-13 16:04:10, 2021-05-13 16:04:20}|18.649999618530273|
>>>>>>>> |{2021-05-14 16:03:30, 2021-05-14 16:03:40}|18.540000915527344|
>>>>>>>> |{2021-05-13 16:01:10, 2021-05-13 16:01:20}|19.889999389648438|
>>>>>>>> |{2021-05-13 16:01:50, 2021-05-13 16:02:00}|16.489999771118164|
>>>>>>>> |{2021-05-14 16:02:30, 2021-05-14 16:02:40}|13.640000343322754|
>>>>>>>>
>>>>>>>>
>>>>>>>> I try to save data on another Kafka topic but my solution it
>>>>>>>> doesn't work:
>>>>>>>>
>>>>>>>> qk = (resultM.
>>>>>>>>       selectExpr("CAST(timestamp AS STRING)", "CAST(temperature AS
>>>>>>>> STRING)") \
>>>>>>>>       .writeStream \
>>>>>>>>       .format("kafka") \
>>>>>>>>       .option("kafka.bootstrap.servers", "localhost:9092") \
>>>>>>>>       .option('checkpointLocation',
>>>>>>>> "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream")
>>>>>>>> \
>>>>>>>>       .option("topic", "avgtemperature") \
>>>>>>>>       .start())
>>>>>>>>
>>>>>>>> because I receive the error:
>>>>>>>>
>>>>>>>> 21/05/17 15:56:29 WARN StreamingQueryManager: Stopping existing
>>>>>>>> streaming query [id=81f48019-534c-446e-90a5-a90598883370,
>>>>>>>> runId=ad277cbb-e906-4d60-8d9c-0f24285041c6], as a new run is being 
>>>>>>>> started.
>>>>>>>> 21/05/17 15:56:29 ERROR MicroBatchExecution: Query [id =
>>>>>>>> 81f48019-534c-446e-90a5-a90598883370, runId =
>>>>>>>> 0fd83640-1176-4695-a7e5-b65717f46a9a] terminated with error
>>>>>>>> org.apache.spark.sql.AnalysisException: Required attribute 'value'
>>>>>>>> not found
>>>>>>>>         at
>>>>>>>> org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:59)
>>>>>>>>         at
>>>>>>>> org.apache.spark.sql.kafka010.KafkaStreamingWrite.<init>(KafkaStreamingWrite.scala:42)
>>>>>>>>         at
>>>>>>>> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable$$anon$2.buildForStreaming(KafkaSourceProvider.scala:411)
>>>>>>>>         at
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:623)
>>>>>>>>         at
>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:144)
>>>>>>>>         at
>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:62)
>>>>>>>>         at
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org
>>>>>>>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:321)
>>>>>>>>         at
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>>>>>>
>>>>>>>> Is my solution wrong?
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> PhD. Giuseppe Ricci
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Il giorno sab 15 mag 2021 alle ore 23:47 Mich Talebzadeh <
>>>>>>>> mich.talebza...@gmail.com> ha scritto:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> In answer to your question I did some tests using broadly your
>>>>>>>>> approach. With regard to your questions:
>>>>>>>>>
>>>>>>>>> "but it does not work well because it does not give a temperature
>>>>>>>>> average as you can see in the attached pic.
>>>>>>>>> Why is the average not calculated on temperature?
>>>>>>>>> How can I view data in each window of 5 minutes and related
>>>>>>>>> average?
>>>>>>>>>
>>>>>>>>> This is similar to the code you are doing
>>>>>>>>>
>>>>>>>>>            streamingDataFrame = self.spark \
>>>>>>>>>                 .readStream \
>>>>>>>>>                 .format("kafka") \
>>>>>>>>>                 .option("kafka.bootstrap.servers",
>>>>>>>>> config['MDVariables']['bootstrapServers'],) \
>>>>>>>>>                 .option("schema.registry.url",
>>>>>>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>>>>>>                 .option("group.id", config['common']['appName']) \
>>>>>>>>>                 .option("zookeeper.connection.timeout.ms",
>>>>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>>>>>>>                 .option("rebalance.backoff.ms",
>>>>>>>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>>>>>>>                 .option("zookeeper.session.timeout.ms",
>>>>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>>>>>>>                 .option("auto.commit.interval.ms",
>>>>>>>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>>>>>>>                 .option("subscribe", "temperature") \
>>>>>>>>>                 .option("failOnDataLoss", "false") \
>>>>>>>>>                 .option("includeHeaders", "false") \
>>>>>>>>>                 .option("startingOffsets", "latest") \
>>>>>>>>>                 .load()
>>>>>>>>>
>>>>>>>>>             streamingDataFrame.printSchema()
>>>>>>>>>             result = streamingDataFrame. \
>>>>>>>>> *                     withWatermark("timestamp", "5 minutes"). \*
>>>>>>>>> *                     groupBy(window(streamingDataFrame.timestamp,
>>>>>>>>> "5 minutes", "5 minutes")). \*
>>>>>>>>>  *                    avg(). \*
>>>>>>>>>                      writeStream. \
>>>>>>>>>                      outputMode('complete'). \
>>>>>>>>>                      option("numRows", 100). \
>>>>>>>>>                      option("truncate", "false"). \
>>>>>>>>>                      format('console'). \
>>>>>>>>>                      option('checkpointLocation',
>>>>>>>>> checkpoint_path). \
>>>>>>>>>                      queryName("temperature"). \
>>>>>>>>>                      start()
>>>>>>>>>
>>>>>>>>> OK
>>>>>>>>>
>>>>>>>>> To simulate the schema of your data which I believe comprises two
>>>>>>>>> keys; timestamp, temperature. I am sending a line of temperature to 
>>>>>>>>> kafka
>>>>>>>>> every minute. Single message every minute, for temperature between 
>>>>>>>>> 20-30
>>>>>>>>> degrees. An example
>>>>>>>>>
>>>>>>>>> {"timestamp":"2021-05-15T22:16:31", "temperature":29}
>>>>>>>>>
>>>>>>>>> So let us print the schema
>>>>>>>>>
>>>>>>>>> streamingDataFrame.printSchema()
>>>>>>>>>
>>>>>>>>> root
>>>>>>>>>  |-- key: binary (nullable = true)
>>>>>>>>>  |-- value: binary (nullable = true)
>>>>>>>>>  |-- topic: string (nullable = true)
>>>>>>>>>  |-- partition: integer (nullable = true)
>>>>>>>>>  |-- offset: long (nullable = true)
>>>>>>>>>  |-- timestamp: timestamp (nullable = true)
>>>>>>>>>  |-- timestampType: integer (nullable = true)
>>>>>>>>>
>>>>>>>>> There is no temperature there as you have not created a
>>>>>>>>> temperature column from json ( see later), So this is what you get if 
>>>>>>>>> you
>>>>>>>>> run this code. Note the batch cycle is 1 minute in my case
>>>>>>>>>
>>>>>>>>> -------------------------------------------
>>>>>>>>> Batch: 2
>>>>>>>>> -------------------------------------------
>>>>>>>>>
>>>>>>>>> +------------------------------------------+--------------+-----------+------------------+
>>>>>>>>> |window
>>>>>>>>> |avg(partition)|avg(offset)|avg(timestampType)|
>>>>>>>>>
>>>>>>>>> +------------------------------------------+--------------+-----------+------------------+
>>>>>>>>> |{2021-05-15 22:05:00, 2021-05-15 22:10:00}|7.0           |7071.0
>>>>>>>>>    |0.0               |
>>>>>>>>> |{2021-05-15 22:00:00, 2021-05-15 22:05:00}|0.0           |7117.0
>>>>>>>>>    |0.0               |
>>>>>>>>>
>>>>>>>>> +------------------------------------------+--------------+-----------+------------------+
>>>>>>>>>
>>>>>>>>> -------------------------------------------
>>>>>>>>> Batch: 3
>>>>>>>>> -------------------------------------------
>>>>>>>>>
>>>>>>>>> +------------------------------------------+--------------+-----------+------------------+
>>>>>>>>> |window
>>>>>>>>> |avg(partition)|avg(offset)|avg(timestampType)|
>>>>>>>>>
>>>>>>>>> +------------------------------------------+--------------+-----------+------------------+
>>>>>>>>> |{2021-05-15 22:05:00, 2021-05-15 22:10:00}|5.5           |7147.5
>>>>>>>>>    |0.0               |
>>>>>>>>> |{2021-05-15 22:00:00, 2021-05-15 22:05:00}|0.0           |7117.0
>>>>>>>>>    |0.0               |
>>>>>>>>>
>>>>>>>>> +------------------------------------------+--------------+-----------+------------------+
>>>>>>>>>
>>>>>>>>> So this is I think what you need to do with your schema
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>         schema = StructType().add("timestamp",
>>>>>>>>> TimestampType()).add("temperature", IntegerType())
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>             streamingDataFrame = self.spark \
>>>>>>>>>
>>>>>>>>>                 .readStream \
>>>>>>>>>
>>>>>>>>>                 .format("kafka") \
>>>>>>>>>
>>>>>>>>>                 .option("kafka.bootstrap.servers",
>>>>>>>>> config['MDVariables']['bootstrapServers'],) \
>>>>>>>>>
>>>>>>>>>                 .option("schema.registry.url",
>>>>>>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>>>>>>
>>>>>>>>>                 .option("group.id", config['common']['appName']) \
>>>>>>>>>
>>>>>>>>>                 .option("zookeeper.connection.timeout.ms",
>>>>>>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>>>>>>>
>>>>>>>>>                 .option("rebalance.backoff.ms",
>>>>>>>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>>>>>>>
>>>>>>>>>                 .option("zookeeper.session.timeout.ms",
>>>>>>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>>>>>>>
>>>>>>>>>                 .option("auto.commit.interval.ms",
>>>>>>>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>>>>>>>
>>>>>>>>>                 .option("subscribe", "temperature") \
>>>>>>>>>
>>>>>>>>>                 .option("failOnDataLoss", "false") \
>>>>>>>>>
>>>>>>>>>                 .option("includeHeaders", "true") \
>>>>>>>>>
>>>>>>>>>                 .option("startingOffsets", "latest") \
>>>>>>>>>
>>>>>>>>>                 .load() \
>>>>>>>>>
>>>>>>>>>                 *.select(from_json(col("value").cast("string"),
>>>>>>>>> schema).alias("parsed_value"))*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>                       # get columns from struct
>>>>>>>>>
>>>>>>>>>      *       resultM = streamingDataFrame.select( \*
>>>>>>>>>
>>>>>>>>> *
>>>>>>>>>  col("parsed_value.timestamp").alias("timestamp") \*
>>>>>>>>>
>>>>>>>>> *                   ,
>>>>>>>>> col("parsed_value.temperature").alias("temperature"))*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>                  result = resultM. \
>>>>>>>>>
>>>>>>>>>                      withWatermark("timestamp", "5 minutes"). \
>>>>>>>>>
>>>>>>>>>                      groupBy(window(resultM.timestamp, "5
>>>>>>>>> minutes", "5 minutes")). \
>>>>>>>>>
>>>>>>>>>                      avg(). \
>>>>>>>>>
>>>>>>>>>                      writeStream. \
>>>>>>>>>
>>>>>>>>>                      outputMode('complete'). \
>>>>>>>>>
>>>>>>>>>                      option("numRows", 100). \
>>>>>>>>>
>>>>>>>>>                      option("truncate", "false"). \
>>>>>>>>>
>>>>>>>>>                      format('console'). \
>>>>>>>>>
>>>>>>>>>                      option('checkpointLocation',
>>>>>>>>> checkpoint_path). \
>>>>>>>>>
>>>>>>>>>                      queryName("temperature"). \
>>>>>>>>>
>>>>>>>>>                      start()
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> And you will get
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -------------------------------------------
>>>>>>>>> Batch: 1
>>>>>>>>> -------------------------------------------
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |window                                    |avg(temperature)|
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|28.0            |
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>>
>>>>>>>>> -------------------------------------------
>>>>>>>>> Batch: 2
>>>>>>>>> -------------------------------------------
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |window                                    |avg(temperature)|
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>>
>>>>>>>>> Batch: 3
>>>>>>>>> -------------------------------------------
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |window                                    |avg(temperature)|
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|22.0            |
>>>>>>>>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>>
>>>>>>>>> Batch: 4
>>>>>>>>> -------------------------------------------
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |window                                    |avg(temperature)|
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|26.0            |
>>>>>>>>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>>
>>>>>>>>> Batch: 5
>>>>>>>>> -------------------------------------------
>>>>>>>>> +------------------------------------------+------------------+
>>>>>>>>> |window                                    |avg(temperature)  |
>>>>>>>>> +------------------------------------------+------------------+
>>>>>>>>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.3              |
>>>>>>>>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5              |
>>>>>>>>> +------------------------------------------+------------------+
>>>>>>>>>
>>>>>>>>> Batch: 6
>>>>>>>>> -------------------------------------------
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |window                                    |avg(temperature)|
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.75           |
>>>>>>>>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>>
>>>>>>>>> Batch: 7
>>>>>>>>> -------------------------------------------
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |window                                    |avg(temperature)|
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.0            |
>>>>>>>>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>>
>>>>>>>>> -------------------------------------------
>>>>>>>>> Batch: 8
>>>>>>>>> -------------------------------------------
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |window                                    |avg(temperature)|
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.0            |
>>>>>>>>> |{2021-05-15 22:40:00, 2021-05-15 22:45:00}|26.0            |
>>>>>>>>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>>>>>>>>> +------------------------------------------+----------------+
>>>>>>>>>
>>>>>>>>> This should be all you need I believe.
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    view my Linkedin profile
>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property 
>>>>>>>>> which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>>>> damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, 11 May 2021 at 14:42, Giuseppe Ricci <
>>>>>>>>> peppepega...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> As suggested by Jayesh I follow his solution.
>>>>>>>>>> I need to have the average temperature at some prefixed minute:
>>>>>>>>>> 5, 10, 15 etc. So it seems a tumbling window is the optimal solution 
>>>>>>>>>> (a).
>>>>>>>>>> Real sensors may send data with some delay..this can be few
>>>>>>>>>> seconds (b).
>>>>>>>>>> So this is my new code (I used a window of 5 minutes):
>>>>>>>>>>
>>>>>>>>>> from pyspark.sql import SparkSession
>>>>>>>>>> from pyspark.sql.types import StringType
>>>>>>>>>>
>>>>>>>>>> # Spark session & context
>>>>>>>>>> spark = (SparkSession
>>>>>>>>>>          .builder
>>>>>>>>>>          .master('local')
>>>>>>>>>>          .appName('TemperatureStreamApp')
>>>>>>>>>>          # Add kafka package
>>>>>>>>>>          .config("spark.jars.packages",
>>>>>>>>>> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
>>>>>>>>>>          .getOrCreate())
>>>>>>>>>>
>>>>>>>>>> sc = spark.sparkContext
>>>>>>>>>>
>>>>>>>>>> # Create stream dataframe setting kafka server, topic and offset
>>>>>>>>>> option
>>>>>>>>>> df = (spark
>>>>>>>>>>   .readStream
>>>>>>>>>>   .format("kafka")
>>>>>>>>>>   .option("kafka.bootstrap.servers", "localhost:9092") # kafka
>>>>>>>>>> server
>>>>>>>>>>   .option("subscribe", "temperature") # topic
>>>>>>>>>>   .option("startingOffsets", "earliest") # start from beginning
>>>>>>>>>>   .load())
>>>>>>>>>>
>>>>>>>>>> windowedAvg = df\
>>>>>>>>>>     .withWatermark("timestamp", "5 minutes") \
>>>>>>>>>>     .groupBy(
>>>>>>>>>>     window(df.timestamp, "5 minutes", "5 minutes")).avg()
>>>>>>>>>>
>>>>>>>>>> query = windowedAvg\
>>>>>>>>>>         .writeStream\
>>>>>>>>>>         .outputMode('complete')\
>>>>>>>>>>         .format('console')\
>>>>>>>>>>         .option('truncate', 'false')\
>>>>>>>>>>         .start()
>>>>>>>>>>
>>>>>>>>>> query.awaitTermination()
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> but it does not work well because it does not give a temperature
>>>>>>>>>> average as you can see in the attached pic.
>>>>>>>>>> Why the average is not calculated on temperature?
>>>>>>>>>> How can I view data in each window of 5 minutes and related
>>>>>>>>>> average?
>>>>>>>>>> Thanks for your help.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> PhD. Giuseppe Ricci
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Il giorno lun 10 mag 2021 alle ore 18:14 Lalwani, Jayesh <
>>>>>>>>>> jlalw...@amazon.com> ha scritto:
>>>>>>>>>>
>>>>>>>>>>> You don’t need to “launch batches” every 5 minutes. You can
>>>>>>>>>>> launch batches every 2 seconds, and aggregate on window for 5 
>>>>>>>>>>> minutes.
>>>>>>>>>>> Spark will read data from topic every 2 seconds, and keep the data 
>>>>>>>>>>> in
>>>>>>>>>>> memory for 5 minutes.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> You need to make few decisions
>>>>>>>>>>>
>>>>>>>>>>>    1. DO you want a tumbling window or a rolling window? A
>>>>>>>>>>>    tumbling window of 5 minutes will produce an aggregate every 5 
>>>>>>>>>>> minutes. It
>>>>>>>>>>>    will aggregate data for 5 minutes before. A rolling window of 5 
>>>>>>>>>>> miutes/1
>>>>>>>>>>>    minute, will produce an aggregate ever 1 minute. It will 
>>>>>>>>>>> aggregate data
>>>>>>>>>>>    ever 1 minute. For example, let’s say you have data evert 2 
>>>>>>>>>>> seconds. A
>>>>>>>>>>>    tumbling window will produce a result on minute 5, 10, 15, 20…. 
>>>>>>>>>>> Minute 5
>>>>>>>>>>>    result will have data from minute 1-4., 15 will have data from 
>>>>>>>>>>> 6-10… and so
>>>>>>>>>>>    on. Rolling window will produce data on minute 5, 6, 7, 8, …. 
>>>>>>>>>>> Minute 5 will
>>>>>>>>>>>    have aggregate from 1-5, minute 6 will have aggregate from 2-6, 
>>>>>>>>>>> and so on.
>>>>>>>>>>>    This defines your window. In your code you have
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> window(df_temp.timestamp, "2 minutes", "1 minutes")
>>>>>>>>>>>
>>>>>>>>>>> This is a rolling window. Here second parameter(2 minutes) is
>>>>>>>>>>> the window interval, and third parameter(1 minutes) is the slide 
>>>>>>>>>>> interval.
>>>>>>>>>>> In the above example, it will produce an aggregate every 1 minute 
>>>>>>>>>>> interval
>>>>>>>>>>> for 2minute worth of data.
>>>>>>>>>>>
>>>>>>>>>>> If you define
>>>>>>>>>>>
>>>>>>>>>>> window(df_temp.timestamp, "2 minutes", "2 minutes")
>>>>>>>>>>>
>>>>>>>>>>> This is a tumbling window. It will produce an aggregate every 2
>>>>>>>>>>> minutes, with 2 minutes worth of data
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    1. Can you have late data? How late can data arrive? Usually
>>>>>>>>>>>    streaming systems send data out of order. Liik, it could happen 
>>>>>>>>>>> that you
>>>>>>>>>>>    get data for t=11:00:00 AM, and then get data for t=10:59:59AM. 
>>>>>>>>>>> This means
>>>>>>>>>>>    that the data is late by 1 second. What’s the worst case 
>>>>>>>>>>> condition for late
>>>>>>>>>>>    data? You need to define the watermark for late data. In your 
>>>>>>>>>>> code, you
>>>>>>>>>>>    have defined a watermark of 2 minutes. For aggregations, the 
>>>>>>>>>>> watermark also
>>>>>>>>>>>    defines which windows Spark will keep in memory. If you define a 
>>>>>>>>>>> watermark
>>>>>>>>>>>    of 2 minutes, and you have a rolling window with slide interval 
>>>>>>>>>>> of 1
>>>>>>>>>>>    minute, Spark will keep 2 windows in memory. Watermark interval 
>>>>>>>>>>> affects how
>>>>>>>>>>>    much memory will be used by Spark
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> It might help if you try to follow the example in this guide
>>>>>>>>>>> very carefully
>>>>>>>>>>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time
>>>>>>>>>>> That is a pretty good example, but you need to follow it event by 
>>>>>>>>>>> event
>>>>>>>>>>> very carefully to get all the nuances.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *From: *Giuseppe Ricci <peppepega...@gmail.com>
>>>>>>>>>>> *Date: *Monday, May 10, 2021 at 11:19 AM
>>>>>>>>>>> *To: *"user@spark.apache.org" <user@spark.apache.org>
>>>>>>>>>>> *Subject: *[EXTERNAL] Calculate average from Spark stream
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *CAUTION*: This email originated from outside of the
>>>>>>>>>>> organization. Do not click links or open attachments unless you can 
>>>>>>>>>>> confirm
>>>>>>>>>>> the sender and know the content is safe.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Hi, I'm new on Apache Spark.
>>>>>>>>>>>
>>>>>>>>>>> I'm trying to read data from an Apache Kafka topic (I have a
>>>>>>>>>>> simulated temperature sensor producer which sends data every 2 
>>>>>>>>>>> second) and
>>>>>>>>>>> I need every 5 minutes to calculate the average temperature. Reading
>>>>>>>>>>> documentation I understand I need to use windows but I'm not able to
>>>>>>>>>>> finalize my code. Can some help me?
>>>>>>>>>>> How can I launch batches every 5 minutes? My code works one time
>>>>>>>>>>> and finishes. Why in the console I can't find any helpful 
>>>>>>>>>>> information for
>>>>>>>>>>> correct execution? See attached picture.
>>>>>>>>>>>
>>>>>>>>>>> This is my code:
>>>>>>>>>>>
>>>>>>>>>>> https://pastebin.com/4S31jEeP
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your precious help.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> PhD. Giuseppe Ricci
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>
>>>>>>>>> --
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>

Reply via email to