Fyi .. apache spark version is 3.1.3

On Wed, Mar 15, 2023 at 4:34 PM karan alang <karan.al...@gmail.com> wrote:

> Hi Mich, this doesn't seem to be working for me .. the watermark seems to
> be getting ignored !
>
> Here is the data put into Kafka :
>
> ```
>
>
> +---------------------------------------------------------------------------------------------------+----+
>
> |value
>                           |key |
>
>
> +---------------------------------------------------------------------------------------------------+----+
>
>
> |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null|
>
>
> |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>
>
> |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>
> |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"}
> |null|
>
>
> +---------------------------------------------------------------------------------------------------+----+
> ```
> Note :
> insert_ts - specifies when the data was inserted
>
> Here is the output of the Structured Stream:
>
> -------------------------------------------
>
> Batch: 2
>
> -------------------------------------------
>
> +-------------------+-------------------+---------------+
>
> |startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
>
> +-------------------+-------------------+---------------+
>
> |2023-03-15 16:10:00|2023-03-15 16:15:00|27             |
>
> |2023-03-15 15:10:00|2023-03-15 15:15:00|14             |
>
> |2023-03-13 10:10:00|2023-03-13 10:15:00|6              |
>
> +-------------------+-------------------+---------------+
>
> Note: I'm summing up the temperatures (for easy verification)
>
> As per the above - all the 3 'ts' are included in the DataFrame, even when
> I added   "ts":"2023-03-13T10:12:00.000-07:00", as the last record.
> Since the wattermark is set to "5 minutes" and the max(ts) ==
> 2023-03-15T16:12:00.000-07:00
> record with ts = "2023-03-13T10:12:00.000-07:00" should have got dropped,
> it is more than 2 days old (i.e. dated - 2023-03-13)!
>
> Any ideas what needs to be changed to make this work ?
>
> Here is the code (modified for my requirement, but essentially the same)
> ```
>
> schema = StructType([
>             StructField("temparature", LongType(), False),
>             StructField("ts", TimestampType(), False),
>             StructField("insert_ts", TimestampType(), False)
>         ])
>
> streamingDataFrame = spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers", kafkaBrokers) \
>                 .option("group.id", 'watermark-grp') \
>                 .option("subscribe", topic) \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "latest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"), 
> schema=schema).alias("parsed_value"))
>
> resultC = streamingDataFrame.select( 
> col("parsed_value.ts").alias("timestamp") \
>                    , col("parsed_value.temparature").alias("temparature"), 
> col("parsed_value.insert_ts").alias("insert_ts"))
>
> resultM = resultC. \
>     withWatermark("timestamp", "5 minutes"). \
>     groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \
>     agg({'temparature':'sum'})
>
> resultMF = resultM. \
>             
> select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame")
>  \
>                           , col("sum(temparature)").alias("Sum_Temperature"))
>
> result = resultMF. \
>                      writeStream. \
>                      outputMode('complete'). \
>                      option("numRows", 1000). \
>                      option("truncate", "false"). \
>                      format('console'). \
>                      option('checkpointLocation', checkpoint_path). \
>                      queryName("sum_temparature"). \
>                      start()
>
> result.awaitTermination()
>
> ```
>
>
>
> On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> OK
>>
>> ts is the timestamp right?
>>
>> This is a similar code that works out the average temperature with time
>> frame of 5 minutes.
>>
>> Note the comments and catch error with try:
>>
>>         try:
>>
>>             # construct a streaming dataframe streamingDataFrame that
>> subscribes to topic temperature
>>             streamingDataFrame = self.spark \
>>                 .readStream \
>>                 .format("kafka") \
>>                 .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>>                 .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>>                 .option("group.id", config['common']['appName']) \
>>                 .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>                 .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>>                 .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>                 .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>>                 .option("subscribe", "temperature") \
>>                 .option("failOnDataLoss", "false") \
>>                 .option("includeHeaders", "true") \
>>                 .option("startingOffsets", "latest") \
>>                 .load() \
>>                 .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))
>>
>>
>>             resultC = streamingDataFrame.select( \
>>                      col("parsed_value.rowkey").alias("rowkey") \
>>                    , col("parsed_value.timestamp").alias("timestamp") \
>>                    , col("parsed_value.temperature").alias("temperature"))
>>
>>             """
>>             We work out the window and the AVG(temperature) in the
>> window's timeframe below
>>             This should return back the following Dataframe as struct
>>
>>              root
>>              |-- window: struct (nullable = false)
>>              |    |-- start: timestamp (nullable = true)
>>              |    |-- end: timestamp (nullable = true)
>>              |-- avg(temperature): double (nullable = true)
>>
>>             """
>>             resultM = resultC. \
>>                      withWatermark("timestamp", "5 minutes"). \
>>                      groupBy(window(resultC.timestamp, "5 minutes", "5
>> minutes")). \
>>                      avg('temperature')
>>
>>             # We take the above Dataframe and flatten it to get the
>> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
>> "AVGTemperature"
>>             resultMF = resultM. \
>>                        select( \
>>
>> F.col("window.start").alias("startOfWindowFrame") \
>>                           , F.col("window.end").alias("endOfWindowFrame")
>> \
>>                           ,
>> F.col("avg(temperature)").alias("AVGTemperature"))
>>
>>             resultMF.printSchema()
>>
>>             result = resultMF. \
>>                      writeStream. \
>>                      outputMode('complete'). \
>>                      option("numRows", 1000). \
>>                      option("truncate", "false"). \
>>                      format('console'). \
>>                      option('checkpointLocation', checkpoint_path). \
>>                      queryName("temperature"). \
>>                      start()
>>
>>         except Exception as e:
>>                 print(f"""{e}, quitting""")
>>                 sys.exit(1)
>>
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 11 Mar 2023 at 04:33, karan alang <karan.al...@gmail.com> wrote:
>>
>>> Hi Mich -
>>> Here is the output of the ldf.printSchema() & ldf.show() commands.
>>>
>>> ldf.printSchema()
>>>
>>> root
>>>  |-- applianceName: string (nullable = true)
>>>  |-- timeslot: long (nullable = true)
>>>  |-- customer: string (nullable = true)
>>>  |-- window: struct (nullable = false)
>>>  |    |-- start: timestamp (nullable = true)
>>>  |    |-- end: timestamp (nullable = true)
>>>  |-- sentOctets: long (nullable = true)
>>>  |-- recvdOctets: long (nullable = true)
>>>
>>>
>>>  ldf.show() :
>>>
>>>
>>>  
>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>> |applianceName     |timeslot    |customer|window
>>>                          |sentOctets|recvdOctets|
>>>
>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>> |abc1              |2797514    |cust1     |{2023-03-11 04:15:00,
>>> 2023-03-11 04:30:00}|21459264  |32211859   |
>>> |pqrq              |2797513    |cust1     |{2023-03-11 04:15:00,
>>> 2023-03-11 04:30:00}|17775527  |31331093   |
>>> |xyz                |2797514    |cust1     |{2023-03-11 04:15:00,
>>> 2023-03-11 04:30:00}|12808015  |24191707   |
>>>
>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>>
>>> Also, any comment on the outputMode ? I've set it to 'update', since I'm
>>> using aggregation.
>>>
>>> thanks!
>>>
>>> On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>>
>>>> Just looking at the code
>>>>
>>>>
>>>> in here
>>>>
>>>>
>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>>                                                              
>>>> window(col("ts"), "15 minutes")) \
>>>>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>>                     .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>>>                     .fillna(0)
>>>>
>>>> What does ldf.printSchema() returns
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Hello All -
>>>>>
>>>>> I've a structured Streaming job which has a trigger of 10 minutes, and
>>>>> I'm using watermark to account for late data coming in. However, the
>>>>> watermark is not working - and instead of a single record with total
>>>>> aggregated value, I see 2 records.
>>>>>
>>>>> Here is the code :
>>>>>
>>>>> ```
>>>>>
>>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>>>>
>>>>>
>>>>>         df_stream = self.spark.readStream.format('kafka') \
>>>>>             .option("kafka.security.protocol", "SSL") \
>>>>>             .option("kafka.ssl.truststore.location", 
>>>>> self.ssl_truststore_location) \
>>>>>             .option("kafka.ssl.truststore.password", 
>>>>> self.ssl_truststore_password) \
>>>>>             .option("kafka.ssl.keystore.location", 
>>>>> self.ssl_keystore_location_bandwidth_intermediate) \
>>>>>             .option("kafka.ssl.keystore.password", 
>>>>> self.ssl_keystore_password_bandwidth_intermediate) \
>>>>>             .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>>>             .option("subscribe", topic) \
>>>>>             .option("startingOffsets", "latest") \
>>>>>             .option("failOnDataLoss", "false") \
>>>>>             .option("kafka.metadata.max.age.ms", "1000") \
>>>>>             .option("kafka.ssl.keystore.type", "PKCS12") \
>>>>>             .option("kafka.ssl.truststore.type", "PKCS12") \
>>>>>             .load()
>>>>>
>>>>> 2. calling foreachBatch(self.process)
>>>>>         # note - outputMode is set to "update" (tried setting outputMode 
>>>>> = append as well)
>>>>>
>>>>>         # 03/09 ::: outputMode - update instead of append
>>>>>         query = df_stream.selectExpr("CAST(value AS STRING)", 
>>>>> "timestamp", "topic").writeStream \
>>>>>             .outputMode("update") \
>>>>>             .trigger(processingTime='10 minutes') \
>>>>>             .option("truncate", "false") \
>>>>>             .option("checkpointLocation", self.checkpoint) \
>>>>>             .foreachBatch(self.process) \
>>>>>             .start()
>>>>>
>>>>>
>>>>> self.process - where i do the bulk of the  processing, which calls the 
>>>>> function  'aggIntfLogs'
>>>>>
>>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing  
>>>>> groupBy to calculate the sum of sentOctets & recvdOctets
>>>>>
>>>>>
>>>>>         def aggIntfLogs(ldf):
>>>>>             if ldf and ldf.count() > 0:
>>>>>
>>>>>                 ldf = ldf.select('applianceName', 'timeslot', 
>>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \
>>>>>                     .withColumn('sentOctets', 
>>>>> ldf["sentOctets"].cast(LongType())) \
>>>>>                     .withColumn('recvdOctets', 
>>>>> ldf["recvdOctets"].cast(LongType())) \
>>>>>                     .withWatermark("ts", "15 minutes")
>>>>>
>>>>>                 ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>>>                                                              
>>>>> window(col("ts"), "15 minutes")) \
>>>>>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>>>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>>>                     .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') 
>>>>> \
>>>>>                     .fillna(0)
>>>>>                 return ldf
>>>>>             return ldf
>>>>>
>>>>>
>>>>>         Dataframe 'ldf' returned from the function aggIntfLogs - is 
>>>>> written to Kafka topic
>>>>>
>>>>> ```
>>>>>
>>>>> I was expecting that using the watermark will account for late coming
>>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>>>>> consolidated data
>>>>> (including late-coming data, since the late coming data comes within
>>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>>>> individually for the records and I see 2 records instead of single record
>>>>> accounting for late coming data within watermark.
>>>>>
>>>>> What needs to be done to fix this & make this work as desired?
>>>>>
>>>>> tia!
>>>>>
>>>>>
>>>>> Here is the Stackoverflow link as well -
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to