Fyi .. apache spark version is 3.1.3
On Wed, Mar 15, 2023 at 4:34 PM karan alang wrote:
> Hi Mich, this doesn't seem to be working for me .. the watermark seems to
> be getting ignored !
>
> Here is the data put into Kafka :
>
> ```
>
>
> +---++
>
> |value
> |key |
>
>
> +---++
>
>
> |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null|
>
>
> |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>
>
> |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>
> |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"}
> |null|
>
>
> +---++
> ```
> Note :
> insert_ts - specifies when the data was inserted
>
> Here is the output of the Structured Stream:
>
> ---
>
> Batch: 2
>
> ---
>
> +---+---+---+
>
> |startOfWindowFrame |endOfWindowFrame |Sum_Temperature|
>
> +---+---+---+
>
> |2023-03-15 16:10:00|2023-03-15 16:15:00|27 |
>
> |2023-03-15 15:10:00|2023-03-15 15:15:00|14 |
>
> |2023-03-13 10:10:00|2023-03-13 10:15:00|6 |
>
> +---+---+---+
>
> Note: I'm summing up the temperatures (for easy verification)
>
> As per the above - all the 3 'ts' are included in the DataFrame, even when
> I added "ts":"2023-03-13T10:12:00.000-07:00", as the last record.
> Since the wattermark is set to "5 minutes" and the max(ts) ==
> 2023-03-15T16:12:00.000-07:00
> record with ts = "2023-03-13T10:12:00.000-07:00" should have got dropped,
> it is more than 2 days old (i.e. dated - 2023-03-13)!
>
> Any ideas what needs to be changed to make this work ?
>
> Here is the code (modified for my requirement, but essentially the same)
> ```
>
> schema = StructType([
> StructField("temparature", LongType(), False),
> StructField("ts", TimestampType(), False),
> StructField("insert_ts", TimestampType(), False)
> ])
>
> streamingDataFrame = spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers", kafkaBrokers) \
> .option("group.id", 'watermark-grp') \
> .option("subscribe", topic) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema=schema).alias("parsed_value"))
>
> resultC = streamingDataFrame.select(
> col("parsed_value.ts").alias("timestamp") \
>, col("parsed_value.temparature").alias("temparature"),
> col("parsed_value.insert_ts").alias("insert_ts"))
>
> resultM = resultC. \
> withWatermark("timestamp", "5 minutes"). \
> groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \
> agg({'temparature':'sum'})
>
> resultMF = resultM. \
>
> select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame")
> \
> , col("sum(temparature)").alias("Sum_Temperature"))
>
> result = resultMF. \
> writeStream. \
> outputMode('complete'). \
> option("numRows", 1000). \
> option("truncate", "false"). \
> format('console'). \
> option('checkpointLocation', checkpoint_path). \
> queryName("sum_temparature"). \
> start()
>
> result.awaitTermination()
>
> ```
>
>
>
> On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh
> wrote:
>
>> OK
>>
>> ts is the timestamp right?
>>
>> This is a similar code that works out the average temperature with time
>> frame of 5 minutes.
>>
>> Note the comments and catch error with try:
>>
>> try:
>>
>> # construct a streaming dataframe streamingDataFrame that
>> subscribes to topic temperature
>> streamingDataFrame = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['appName']) \
>> .option("z