Hi Mich,
I'm currently testing this on my mac ..  are you able to reproduce this
issue ?

Note - the code is similar .. except outputMode is set to update.
wrt outputMode - when using aggregation + watermark, the outputMode should
be either append Or update, in your code - you have used 'complete'

any comments on this ?

tia!


On Fri, Mar 17, 2023 at 2:36 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Karan,
>
> The version tested was 3.1.1. Are you running on Dataproc serverless
> 3.1.3?
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 16 Mar 2023 at 23:49, karan alang <karan.al...@gmail.com> wrote:
>
>> Fyi .. apache spark version is 3.1.3
>>
>> On Wed, Mar 15, 2023 at 4:34 PM karan alang <karan.al...@gmail.com>
>> wrote:
>>
>>> Hi Mich, this doesn't seem to be working for me .. the watermark seems
>>> to be getting ignored !
>>>
>>> Here is the data put into Kafka :
>>>
>>> ```
>>>
>>>
>>> +---------------------------------------------------------------------------------------------------+----+
>>>
>>> |value
>>>                             |key |
>>>
>>>
>>> +---------------------------------------------------------------------------------------------------+----+
>>>
>>>
>>> |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null|
>>>
>>>
>>> |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>>>
>>>
>>> |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>>>
>>> |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"}
>>> |null|
>>>
>>>
>>> +---------------------------------------------------------------------------------------------------+----+
>>> ```
>>> Note :
>>> insert_ts - specifies when the data was inserted
>>>
>>> Here is the output of the Structured Stream:
>>>
>>> -------------------------------------------
>>>
>>> Batch: 2
>>>
>>> -------------------------------------------
>>>
>>> +-------------------+-------------------+---------------+
>>>
>>> |startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
>>>
>>> +-------------------+-------------------+---------------+
>>>
>>> |2023-03-15 16:10:00|2023-03-15 16:15:00|27             |
>>>
>>> |2023-03-15 15:10:00|2023-03-15 15:15:00|14             |
>>>
>>> |2023-03-13 10:10:00|2023-03-13 10:15:00|6              |
>>>
>>> +-------------------+-------------------+---------------+
>>>
>>> Note: I'm summing up the temperatures (for easy verification)
>>>
>>> As per the above - all the 3 'ts' are included in the DataFrame, even
>>> when I added   "ts":"2023-03-13T10:12:00.000-07:00", as the last record.
>>> Since the wattermark is set to "5 minutes" and the max(ts) ==
>>> 2023-03-15T16:12:00.000-07:00
>>> record with ts = "2023-03-13T10:12:00.000-07:00" should have got
>>> dropped, it is more than 2 days old (i.e. dated - 2023-03-13)!
>>>
>>> Any ideas what needs to be changed to make this work ?
>>>
>>> Here is the code (modified for my requirement, but essentially the same)
>>> ```
>>>
>>> schema = StructType([
>>>             StructField("temparature", LongType(), False),
>>>             StructField("ts", TimestampType(), False),
>>>             StructField("insert_ts", TimestampType(), False)
>>>         ])
>>>
>>> streamingDataFrame = spark \
>>>                 .readStream \
>>>                 .format("kafka") \
>>>                 .option("kafka.bootstrap.servers", kafkaBrokers) \
>>>                 .option("group.id", 'watermark-grp') \
>>>                 .option("subscribe", topic) \
>>>                 .option("failOnDataLoss", "false") \
>>>                 .option("includeHeaders", "true") \
>>>                 .option("startingOffsets", "latest") \
>>>                 .load() \
>>>                 .select(from_json(col("value").cast("string"), 
>>> schema=schema).alias("parsed_value"))
>>>
>>> resultC = streamingDataFrame.select( 
>>> col("parsed_value.ts").alias("timestamp") \
>>>                    , col("parsed_value.temparature").alias("temparature"), 
>>> col("parsed_value.insert_ts").alias("insert_ts"))
>>>
>>> resultM = resultC. \
>>>     withWatermark("timestamp", "5 minutes"). \
>>>     groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \
>>>     agg({'temparature':'sum'})
>>>
>>> resultMF = resultM. \
>>>             
>>> select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame")
>>>  \
>>>                           , 
>>> col("sum(temparature)").alias("Sum_Temperature"))
>>>
>>> result = resultMF. \
>>>                      writeStream. \
>>>                      outputMode('complete'). \
>>>                      option("numRows", 1000). \
>>>                      option("truncate", "false"). \
>>>                      format('console'). \
>>>                      option('checkpointLocation', checkpoint_path). \
>>>                      queryName("sum_temparature"). \
>>>                      start()
>>>
>>> result.awaitTermination()
>>>
>>> ```
>>>
>>>
>>>
>>> On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> OK
>>>>
>>>> ts is the timestamp right?
>>>>
>>>> This is a similar code that works out the average temperature with time
>>>> frame of 5 minutes.
>>>>
>>>> Note the comments and catch error with try:
>>>>
>>>>         try:
>>>>
>>>>             # construct a streaming dataframe streamingDataFrame that
>>>> subscribes to topic temperature
>>>>             streamingDataFrame = self.spark \
>>>>                 .readStream \
>>>>                 .format("kafka") \
>>>>                 .option("kafka.bootstrap.servers",
>>>> config['MDVariables']['bootstrapServers'],) \
>>>>                 .option("schema.registry.url",
>>>> config['MDVariables']['schemaRegistryURL']) \
>>>>                 .option("group.id", config['common']['appName']) \
>>>>                 .option("zookeeper.connection.timeout.ms",
>>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>>                 .option("rebalance.backoff.ms",
>>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>>                 .option("zookeeper.session.timeout.ms",
>>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>>                 .option("auto.commit.interval.ms",
>>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>>                 .option("subscribe", "temperature") \
>>>>                 .option("failOnDataLoss", "false") \
>>>>                 .option("includeHeaders", "true") \
>>>>                 .option("startingOffsets", "latest") \
>>>>                 .load() \
>>>>                 .select(from_json(col("value").cast("string"),
>>>> schema).alias("parsed_value"))
>>>>
>>>>
>>>>             resultC = streamingDataFrame.select( \
>>>>                      col("parsed_value.rowkey").alias("rowkey") \
>>>>                    , col("parsed_value.timestamp").alias("timestamp") \
>>>>                    ,
>>>> col("parsed_value.temperature").alias("temperature"))
>>>>
>>>>             """
>>>>             We work out the window and the AVG(temperature) in the
>>>> window's timeframe below
>>>>             This should return back the following Dataframe as struct
>>>>
>>>>              root
>>>>              |-- window: struct (nullable = false)
>>>>              |    |-- start: timestamp (nullable = true)
>>>>              |    |-- end: timestamp (nullable = true)
>>>>              |-- avg(temperature): double (nullable = true)
>>>>
>>>>             """
>>>>             resultM = resultC. \
>>>>                      withWatermark("timestamp", "5 minutes"). \
>>>>                      groupBy(window(resultC.timestamp, "5 minutes", "5
>>>> minutes")). \
>>>>                      avg('temperature')
>>>>
>>>>             # We take the above Dataframe and flatten it to get the
>>>> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
>>>> "AVGTemperature"
>>>>             resultMF = resultM. \
>>>>                        select( \
>>>>
>>>> F.col("window.start").alias("startOfWindowFrame") \
>>>>                           ,
>>>> F.col("window.end").alias("endOfWindowFrame") \
>>>>                           ,
>>>> F.col("avg(temperature)").alias("AVGTemperature"))
>>>>
>>>>             resultMF.printSchema()
>>>>
>>>>             result = resultMF. \
>>>>                      writeStream. \
>>>>                      outputMode('complete'). \
>>>>                      option("numRows", 1000). \
>>>>                      option("truncate", "false"). \
>>>>                      format('console'). \
>>>>                      option('checkpointLocation', checkpoint_path). \
>>>>                      queryName("temperature"). \
>>>>                      start()
>>>>
>>>>         except Exception as e:
>>>>                 print(f"""{e}, quitting""")
>>>>                 sys.exit(1)
>>>>
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, 11 Mar 2023 at 04:33, karan alang <karan.al...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Mich -
>>>>> Here is the output of the ldf.printSchema() & ldf.show() commands.
>>>>>
>>>>> ldf.printSchema()
>>>>>
>>>>> root
>>>>>  |-- applianceName: string (nullable = true)
>>>>>  |-- timeslot: long (nullable = true)
>>>>>  |-- customer: string (nullable = true)
>>>>>  |-- window: struct (nullable = false)
>>>>>  |    |-- start: timestamp (nullable = true)
>>>>>  |    |-- end: timestamp (nullable = true)
>>>>>  |-- sentOctets: long (nullable = true)
>>>>>  |-- recvdOctets: long (nullable = true)
>>>>>
>>>>>
>>>>>  ldf.show() :
>>>>>
>>>>>
>>>>>  
>>>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>>>> |applianceName     |timeslot    |customer|window
>>>>>                            |sentOctets|recvdOctets|
>>>>>
>>>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>>>> |abc1              |2797514    |cust1     |{2023-03-11 04:15:00,
>>>>> 2023-03-11 04:30:00}|21459264  |32211859   |
>>>>> |pqrq              |2797513    |cust1     |{2023-03-11 04:15:00,
>>>>> 2023-03-11 04:30:00}|17775527  |31331093   |
>>>>> |xyz                |2797514    |cust1     |{2023-03-11 04:15:00,
>>>>> 2023-03-11 04:30:00}|12808015  |24191707   |
>>>>>
>>>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>>>>
>>>>> Also, any comment on the outputMode ? I've set it to 'update', since
>>>>> I'm using aggregation.
>>>>>
>>>>> thanks!
>>>>>
>>>>> On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> Just looking at the code
>>>>>>
>>>>>>
>>>>>> in here
>>>>>>
>>>>>>
>>>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>>>>                                                              
>>>>>> window(col("ts"), "15 minutes")) \
>>>>>>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>>>>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>>>>                     .withColumnRenamed('sum(recvdOctets)', 
>>>>>> 'recvdOctets') \
>>>>>>                     .fillna(0)
>>>>>>
>>>>>> What does ldf.printSchema() returns
>>>>>>
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hello All -
>>>>>>>
>>>>>>> I've a structured Streaming job which has a trigger of 10 minutes,
>>>>>>> and I'm using watermark to account for late data coming in. However, the
>>>>>>> watermark is not working - and instead of a single record with total
>>>>>>> aggregated value, I see 2 records.
>>>>>>>
>>>>>>> Here is the code :
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>>>>>>
>>>>>>>
>>>>>>>         df_stream = self.spark.readStream.format('kafka') \
>>>>>>>             .option("kafka.security.protocol", "SSL") \
>>>>>>>             .option("kafka.ssl.truststore.location", 
>>>>>>> self.ssl_truststore_location) \
>>>>>>>             .option("kafka.ssl.truststore.password", 
>>>>>>> self.ssl_truststore_password) \
>>>>>>>             .option("kafka.ssl.keystore.location", 
>>>>>>> self.ssl_keystore_location_bandwidth_intermediate) \
>>>>>>>             .option("kafka.ssl.keystore.password", 
>>>>>>> self.ssl_keystore_password_bandwidth_intermediate) \
>>>>>>>             .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>>>>>             .option("subscribe", topic) \
>>>>>>>             .option("startingOffsets", "latest") \
>>>>>>>             .option("failOnDataLoss", "false") \
>>>>>>>             .option("kafka.metadata.max.age.ms", "1000") \
>>>>>>>             .option("kafka.ssl.keystore.type", "PKCS12") \
>>>>>>>             .option("kafka.ssl.truststore.type", "PKCS12") \
>>>>>>>             .load()
>>>>>>>
>>>>>>> 2. calling foreachBatch(self.process)
>>>>>>>         # note - outputMode is set to "update" (tried setting 
>>>>>>> outputMode = append as well)
>>>>>>>
>>>>>>>         # 03/09 ::: outputMode - update instead of append
>>>>>>>         query = df_stream.selectExpr("CAST(value AS STRING)", 
>>>>>>> "timestamp", "topic").writeStream \
>>>>>>>             .outputMode("update") \
>>>>>>>             .trigger(processingTime='10 minutes') \
>>>>>>>             .option("truncate", "false") \
>>>>>>>             .option("checkpointLocation", self.checkpoint) \
>>>>>>>             .foreachBatch(self.process) \
>>>>>>>             .start()
>>>>>>>
>>>>>>>
>>>>>>> self.process - where i do the bulk of the  processing, which calls the 
>>>>>>> function  'aggIntfLogs'
>>>>>>>
>>>>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing  
>>>>>>> groupBy to calculate the sum of sentOctets & recvdOctets
>>>>>>>
>>>>>>>
>>>>>>>         def aggIntfLogs(ldf):
>>>>>>>             if ldf and ldf.count() > 0:
>>>>>>>
>>>>>>>                 ldf = ldf.select('applianceName', 'timeslot', 
>>>>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \
>>>>>>>                     .withColumn('sentOctets', 
>>>>>>> ldf["sentOctets"].cast(LongType())) \
>>>>>>>                     .withColumn('recvdOctets', 
>>>>>>> ldf["recvdOctets"].cast(LongType())) \
>>>>>>>                     .withWatermark("ts", "15 minutes")
>>>>>>>
>>>>>>>                 ldf = ldf.groupBy("applianceName", "timeslot", 
>>>>>>> "customer",
>>>>>>>                                                              
>>>>>>> window(col("ts"), "15 minutes")) \
>>>>>>>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>>>>>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') 
>>>>>>> \
>>>>>>>                     .withColumnRenamed('sum(recvdOctets)', 
>>>>>>> 'recvdOctets') \
>>>>>>>                     .fillna(0)
>>>>>>>                 return ldf
>>>>>>>             return ldf
>>>>>>>
>>>>>>>
>>>>>>>         Dataframe 'ldf' returned from the function aggIntfLogs - is 
>>>>>>> written to Kafka topic
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>> I was expecting that using the watermark will account for late
>>>>>>> coming data .. i.e. the sentOctets & recvdOctets are calculated for the
>>>>>>> consolidated data
>>>>>>> (including late-coming data, since the late coming data comes within
>>>>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>>>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>>>>>> individually for the records and I see 2 records instead of single 
>>>>>>> record
>>>>>>> accounting for late coming data within watermark.
>>>>>>>
>>>>>>> What needs to be done to fix this & make this work as desired?
>>>>>>>
>>>>>>> tia!
>>>>>>>
>>>>>>>
>>>>>>> Here is the Stackoverflow link as well -
>>>>>>>
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to