;>>>>>> .option("schema.registry.url",
>>>>>>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>>>>>>
>>>>>>>>> .option("gro
option('checkpointLocation', checkpoint_path). \
>>>>>>>
>>>>>>> queryName("temperature"). \
>>>>>>>
>>>>>>> start()
>>>>>>>
>>>>>>&
t; +--++
>>>>>>
>>>>>> Batch: 3
>>>>>> -------
>>>>>> +--++
>>>>>> |win
2:30:00, 2021-05-15 22:35:00}|25.5|
>>>> +--++
>>>>
>>>> -----------
>>>> Batch: 8
>>>> ---
>>>> +---
t;>> # Create stream dataframe setting kafka server, topic and offset option
>>> df = (spark
>>> .readStream
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", "localhost:9092") # kafka server
>>> .option(&q
tes", "1 minutes")
>>
>> This is a rolling window. Here second parameter(2 minutes) is the window
>> interval, and third parameter(1 minutes) is the slide interval. In the
>> above example, it will produce an aggregate every 1 minute interval for
>> 2minute
ine the watermark for late data. In your code, you
>>have defined a watermark of 2 minutes. For aggregations, the watermark
>> also
>> defines which windows Spark will keep in memory. If you define a watermark
>>of 2 minutes, and you have a rolling window w
mple, but you need to follow it event by event very
carefully to get all the nuances.
From: Giuseppe Ricci
Date: Monday, May 10, 2021 at 11:19 AM
To: "user@spark.apache.org"
Subject: [EXTERNAL] Calculate average from Spark stream
CAUTION: This email originated from outside of the org
Hi Giuseppe,
Just looked over your PySpark code. You are doing Spark Structured
Streaming (SSS)
Your kafka topic sends messages every two seconds and regardless you want
to enrich the data every 5 minutes. In other words weait for 5 minutes to
build the batch.
You can either run wait for 5 minut