Hi Mich, I'm currently testing this on my mac .. are you able to reproduce this issue ?
Note - the code is similar .. except outputMode is set to update. wrt outputMode - when using aggregation + watermark, the outputMode should be either append Or update, in your code - you have used 'complete' any comments on this ? tia! On Fri, Mar 17, 2023 at 2:36 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi Karan, > > The version tested was 3.1.1. Are you running on Dataproc serverless > 3.1.3? > > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 16 Mar 2023 at 23:49, karan alang <karan.al...@gmail.com> wrote: > >> Fyi .. apache spark version is 3.1.3 >> >> On Wed, Mar 15, 2023 at 4:34 PM karan alang <karan.al...@gmail.com> >> wrote: >> >>> Hi Mich, this doesn't seem to be working for me .. the watermark seems >>> to be getting ignored ! >>> >>> Here is the data put into Kafka : >>> >>> ``` >>> >>> >>> +---------------------------------------------------------------------------------------------------+----+ >>> >>> |value >>> |key | >>> >>> >>> +---------------------------------------------------------------------------------------------------+----+ >>> >>> >>> |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null| >>> >>> >>> |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null| >>> >>> >>> |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null| >>> >>> |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"} >>> |null| >>> >>> >>> +---------------------------------------------------------------------------------------------------+----+ >>> ``` >>> Note : >>> insert_ts - specifies when the data was inserted >>> >>> Here is the output of the Structured Stream: >>> >>> ------------------------------------------- >>> >>> Batch: 2 >>> >>> ------------------------------------------- >>> >>> +-------------------+-------------------+---------------+ >>> >>> |startOfWindowFrame |endOfWindowFrame |Sum_Temperature| >>> >>> +-------------------+-------------------+---------------+ >>> >>> |2023-03-15 16:10:00|2023-03-15 16:15:00|27 | >>> >>> |2023-03-15 15:10:00|2023-03-15 15:15:00|14 | >>> >>> |2023-03-13 10:10:00|2023-03-13 10:15:00|6 | >>> >>> +-------------------+-------------------+---------------+ >>> >>> Note: I'm summing up the temperatures (for easy verification) >>> >>> As per the above - all the 3 'ts' are included in the DataFrame, even >>> when I added "ts":"2023-03-13T10:12:00.000-07:00", as the last record. >>> Since the wattermark is set to "5 minutes" and the max(ts) == >>> 2023-03-15T16:12:00.000-07:00 >>> record with ts = "2023-03-13T10:12:00.000-07:00" should have got >>> dropped, it is more than 2 days old (i.e. dated - 2023-03-13)! >>> >>> Any ideas what needs to be changed to make this work ? >>> >>> Here is the code (modified for my requirement, but essentially the same) >>> ``` >>> >>> schema = StructType([ >>> StructField("temparature", LongType(), False), >>> StructField("ts", TimestampType(), False), >>> StructField("insert_ts", TimestampType(), False) >>> ]) >>> >>> streamingDataFrame = spark \ >>> .readStream \ >>> .format("kafka") \ >>> .option("kafka.bootstrap.servers", kafkaBrokers) \ >>> .option("group.id", 'watermark-grp') \ >>> .option("subscribe", topic) \ >>> .option("failOnDataLoss", "false") \ >>> .option("includeHeaders", "true") \ >>> .option("startingOffsets", "latest") \ >>> .load() \ >>> .select(from_json(col("value").cast("string"), >>> schema=schema).alias("parsed_value")) >>> >>> resultC = streamingDataFrame.select( >>> col("parsed_value.ts").alias("timestamp") \ >>> , col("parsed_value.temparature").alias("temparature"), >>> col("parsed_value.insert_ts").alias("insert_ts")) >>> >>> resultM = resultC. \ >>> withWatermark("timestamp", "5 minutes"). \ >>> groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \ >>> agg({'temparature':'sum'}) >>> >>> resultMF = resultM. \ >>> >>> select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame") >>> \ >>> , >>> col("sum(temparature)").alias("Sum_Temperature")) >>> >>> result = resultMF. \ >>> writeStream. \ >>> outputMode('complete'). \ >>> option("numRows", 1000). \ >>> option("truncate", "false"). \ >>> format('console'). \ >>> option('checkpointLocation', checkpoint_path). \ >>> queryName("sum_temparature"). \ >>> start() >>> >>> result.awaitTermination() >>> >>> ``` >>> >>> >>> >>> On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> OK >>>> >>>> ts is the timestamp right? >>>> >>>> This is a similar code that works out the average temperature with time >>>> frame of 5 minutes. >>>> >>>> Note the comments and catch error with try: >>>> >>>> try: >>>> >>>> # construct a streaming dataframe streamingDataFrame that >>>> subscribes to topic temperature >>>> streamingDataFrame = self.spark \ >>>> .readStream \ >>>> .format("kafka") \ >>>> .option("kafka.bootstrap.servers", >>>> config['MDVariables']['bootstrapServers'],) \ >>>> .option("schema.registry.url", >>>> config['MDVariables']['schemaRegistryURL']) \ >>>> .option("group.id", config['common']['appName']) \ >>>> .option("zookeeper.connection.timeout.ms", >>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>>> .option("rebalance.backoff.ms", >>>> config['MDVariables']['rebalanceBackoffMS']) \ >>>> .option("zookeeper.session.timeout.ms", >>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>>> .option("auto.commit.interval.ms", >>>> config['MDVariables']['autoCommitIntervalMS']) \ >>>> .option("subscribe", "temperature") \ >>>> .option("failOnDataLoss", "false") \ >>>> .option("includeHeaders", "true") \ >>>> .option("startingOffsets", "latest") \ >>>> .load() \ >>>> .select(from_json(col("value").cast("string"), >>>> schema).alias("parsed_value")) >>>> >>>> >>>> resultC = streamingDataFrame.select( \ >>>> col("parsed_value.rowkey").alias("rowkey") \ >>>> , col("parsed_value.timestamp").alias("timestamp") \ >>>> , >>>> col("parsed_value.temperature").alias("temperature")) >>>> >>>> """ >>>> We work out the window and the AVG(temperature) in the >>>> window's timeframe below >>>> This should return back the following Dataframe as struct >>>> >>>> root >>>> |-- window: struct (nullable = false) >>>> | |-- start: timestamp (nullable = true) >>>> | |-- end: timestamp (nullable = true) >>>> |-- avg(temperature): double (nullable = true) >>>> >>>> """ >>>> resultM = resultC. \ >>>> withWatermark("timestamp", "5 minutes"). \ >>>> groupBy(window(resultC.timestamp, "5 minutes", "5 >>>> minutes")). \ >>>> avg('temperature') >>>> >>>> # We take the above Dataframe and flatten it to get the >>>> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and >>>> "AVGTemperature" >>>> resultMF = resultM. \ >>>> select( \ >>>> >>>> F.col("window.start").alias("startOfWindowFrame") \ >>>> , >>>> F.col("window.end").alias("endOfWindowFrame") \ >>>> , >>>> F.col("avg(temperature)").alias("AVGTemperature")) >>>> >>>> resultMF.printSchema() >>>> >>>> result = resultMF. \ >>>> writeStream. \ >>>> outputMode('complete'). \ >>>> option("numRows", 1000). \ >>>> option("truncate", "false"). \ >>>> format('console'). \ >>>> option('checkpointLocation', checkpoint_path). \ >>>> queryName("temperature"). \ >>>> start() >>>> >>>> except Exception as e: >>>> print(f"""{e}, quitting""") >>>> sys.exit(1) >>>> >>>> >>>> >>>> HTH >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Sat, 11 Mar 2023 at 04:33, karan alang <karan.al...@gmail.com> >>>> wrote: >>>> >>>>> Hi Mich - >>>>> Here is the output of the ldf.printSchema() & ldf.show() commands. >>>>> >>>>> ldf.printSchema() >>>>> >>>>> root >>>>> |-- applianceName: string (nullable = true) >>>>> |-- timeslot: long (nullable = true) >>>>> |-- customer: string (nullable = true) >>>>> |-- window: struct (nullable = false) >>>>> | |-- start: timestamp (nullable = true) >>>>> | |-- end: timestamp (nullable = true) >>>>> |-- sentOctets: long (nullable = true) >>>>> |-- recvdOctets: long (nullable = true) >>>>> >>>>> >>>>> ldf.show() : >>>>> >>>>> >>>>> >>>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >>>>> |applianceName |timeslot |customer|window >>>>> |sentOctets|recvdOctets| >>>>> >>>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >>>>> |abc1 |2797514 |cust1 |{2023-03-11 04:15:00, >>>>> 2023-03-11 04:30:00}|21459264 |32211859 | >>>>> |pqrq |2797513 |cust1 |{2023-03-11 04:15:00, >>>>> 2023-03-11 04:30:00}|17775527 |31331093 | >>>>> |xyz |2797514 |cust1 |{2023-03-11 04:15:00, >>>>> 2023-03-11 04:30:00}|12808015 |24191707 | >>>>> >>>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >>>>> >>>>> Also, any comment on the outputMode ? I've set it to 'update', since >>>>> I'm using aggregation. >>>>> >>>>> thanks! >>>>> >>>>> On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> >>>>>> Just looking at the code >>>>>> >>>>>> >>>>>> in here >>>>>> >>>>>> >>>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer", >>>>>> >>>>>> window(col("ts"), "15 minutes")) \ >>>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>>>>> .withColumnRenamed('sum(recvdOctets)', >>>>>> 'recvdOctets') \ >>>>>> .fillna(0) >>>>>> >>>>>> What does ldf.printSchema() returns >>>>>> >>>>>> >>>>>> HTH >>>>>> >>>>>> >>>>>> view my Linkedin profile >>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>> >>>>>> >>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> Hello All - >>>>>>> >>>>>>> I've a structured Streaming job which has a trigger of 10 minutes, >>>>>>> and I'm using watermark to account for late data coming in. However, the >>>>>>> watermark is not working - and instead of a single record with total >>>>>>> aggregated value, I see 2 records. >>>>>>> >>>>>>> Here is the code : >>>>>>> >>>>>>> ``` >>>>>>> >>>>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins >>>>>>> >>>>>>> >>>>>>> df_stream = self.spark.readStream.format('kafka') \ >>>>>>> .option("kafka.security.protocol", "SSL") \ >>>>>>> .option("kafka.ssl.truststore.location", >>>>>>> self.ssl_truststore_location) \ >>>>>>> .option("kafka.ssl.truststore.password", >>>>>>> self.ssl_truststore_password) \ >>>>>>> .option("kafka.ssl.keystore.location", >>>>>>> self.ssl_keystore_location_bandwidth_intermediate) \ >>>>>>> .option("kafka.ssl.keystore.password", >>>>>>> self.ssl_keystore_password_bandwidth_intermediate) \ >>>>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >>>>>>> .option("subscribe", topic) \ >>>>>>> .option("startingOffsets", "latest") \ >>>>>>> .option("failOnDataLoss", "false") \ >>>>>>> .option("kafka.metadata.max.age.ms", "1000") \ >>>>>>> .option("kafka.ssl.keystore.type", "PKCS12") \ >>>>>>> .option("kafka.ssl.truststore.type", "PKCS12") \ >>>>>>> .load() >>>>>>> >>>>>>> 2. calling foreachBatch(self.process) >>>>>>> # note - outputMode is set to "update" (tried setting >>>>>>> outputMode = append as well) >>>>>>> >>>>>>> # 03/09 ::: outputMode - update instead of append >>>>>>> query = df_stream.selectExpr("CAST(value AS STRING)", >>>>>>> "timestamp", "topic").writeStream \ >>>>>>> .outputMode("update") \ >>>>>>> .trigger(processingTime='10 minutes') \ >>>>>>> .option("truncate", "false") \ >>>>>>> .option("checkpointLocation", self.checkpoint) \ >>>>>>> .foreachBatch(self.process) \ >>>>>>> .start() >>>>>>> >>>>>>> >>>>>>> self.process - where i do the bulk of the processing, which calls the >>>>>>> function 'aggIntfLogs' >>>>>>> >>>>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing >>>>>>> groupBy to calculate the sum of sentOctets & recvdOctets >>>>>>> >>>>>>> >>>>>>> def aggIntfLogs(ldf): >>>>>>> if ldf and ldf.count() > 0: >>>>>>> >>>>>>> ldf = ldf.select('applianceName', 'timeslot', >>>>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \ >>>>>>> .withColumn('sentOctets', >>>>>>> ldf["sentOctets"].cast(LongType())) \ >>>>>>> .withColumn('recvdOctets', >>>>>>> ldf["recvdOctets"].cast(LongType())) \ >>>>>>> .withWatermark("ts", "15 minutes") >>>>>>> >>>>>>> ldf = ldf.groupBy("applianceName", "timeslot", >>>>>>> "customer", >>>>>>> >>>>>>> window(col("ts"), "15 minutes")) \ >>>>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') >>>>>>> \ >>>>>>> .withColumnRenamed('sum(recvdOctets)', >>>>>>> 'recvdOctets') \ >>>>>>> .fillna(0) >>>>>>> return ldf >>>>>>> return ldf >>>>>>> >>>>>>> >>>>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is >>>>>>> written to Kafka topic >>>>>>> >>>>>>> ``` >>>>>>> >>>>>>> I was expecting that using the watermark will account for late >>>>>>> coming data .. i.e. the sentOctets & recvdOctets are calculated for the >>>>>>> consolidated data >>>>>>> (including late-coming data, since the late coming data comes within >>>>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key - >>>>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated >>>>>>> individually for the records and I see 2 records instead of single >>>>>>> record >>>>>>> accounting for late coming data within watermark. >>>>>>> >>>>>>> What needs to be done to fix this & make this work as desired? >>>>>>> >>>>>>> tia! >>>>>>> >>>>>>> >>>>>>> Here is the Stackoverflow link as well - >>>>>>> >>>>>>> >>>>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >>>>>>> >>>>>>> >>>>>>> >>>>>>>