Hi Karan, The version tested was 3.1.1. Are you running on Dataproc serverless 3.1.3?
Mich Talebzadeh, Lead Solutions Architect/Engineering Lead Palantir Technologies Limited view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 16 Mar 2023 at 23:49, karan alang <karan.al...@gmail.com> wrote: > Fyi .. apache spark version is 3.1.3 > > On Wed, Mar 15, 2023 at 4:34 PM karan alang <karan.al...@gmail.com> wrote: > >> Hi Mich, this doesn't seem to be working for me .. the watermark seems to >> be getting ignored ! >> >> Here is the data put into Kafka : >> >> ``` >> >> >> +---------------------------------------------------------------------------------------------------+----+ >> >> |value >> |key | >> >> >> +---------------------------------------------------------------------------------------------------+----+ >> >> >> |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null| >> >> >> |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null| >> >> >> |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null| >> >> |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"} >> |null| >> >> >> +---------------------------------------------------------------------------------------------------+----+ >> ``` >> Note : >> insert_ts - specifies when the data was inserted >> >> Here is the output of the Structured Stream: >> >> ------------------------------------------- >> >> Batch: 2 >> >> ------------------------------------------- >> >> +-------------------+-------------------+---------------+ >> >> |startOfWindowFrame |endOfWindowFrame |Sum_Temperature| >> >> +-------------------+-------------------+---------------+ >> >> |2023-03-15 16:10:00|2023-03-15 16:15:00|27 | >> >> |2023-03-15 15:10:00|2023-03-15 15:15:00|14 | >> >> |2023-03-13 10:10:00|2023-03-13 10:15:00|6 | >> >> +-------------------+-------------------+---------------+ >> >> Note: I'm summing up the temperatures (for easy verification) >> >> As per the above - all the 3 'ts' are included in the DataFrame, even >> when I added "ts":"2023-03-13T10:12:00.000-07:00", as the last record. >> Since the wattermark is set to "5 minutes" and the max(ts) == >> 2023-03-15T16:12:00.000-07:00 >> record with ts = "2023-03-13T10:12:00.000-07:00" should have got >> dropped, it is more than 2 days old (i.e. dated - 2023-03-13)! >> >> Any ideas what needs to be changed to make this work ? >> >> Here is the code (modified for my requirement, but essentially the same) >> ``` >> >> schema = StructType([ >> StructField("temparature", LongType(), False), >> StructField("ts", TimestampType(), False), >> StructField("insert_ts", TimestampType(), False) >> ]) >> >> streamingDataFrame = spark \ >> .readStream \ >> .format("kafka") \ >> .option("kafka.bootstrap.servers", kafkaBrokers) \ >> .option("group.id", 'watermark-grp') \ >> .option("subscribe", topic) \ >> .option("failOnDataLoss", "false") \ >> .option("includeHeaders", "true") \ >> .option("startingOffsets", "latest") \ >> .load() \ >> .select(from_json(col("value").cast("string"), >> schema=schema).alias("parsed_value")) >> >> resultC = streamingDataFrame.select( >> col("parsed_value.ts").alias("timestamp") \ >> , col("parsed_value.temparature").alias("temparature"), >> col("parsed_value.insert_ts").alias("insert_ts")) >> >> resultM = resultC. \ >> withWatermark("timestamp", "5 minutes"). \ >> groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \ >> agg({'temparature':'sum'}) >> >> resultMF = resultM. \ >> >> select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame") >> \ >> , col("sum(temparature)").alias("Sum_Temperature")) >> >> result = resultMF. \ >> writeStream. \ >> outputMode('complete'). \ >> option("numRows", 1000). \ >> option("truncate", "false"). \ >> format('console'). \ >> option('checkpointLocation', checkpoint_path). \ >> queryName("sum_temparature"). \ >> start() >> >> result.awaitTermination() >> >> ``` >> >> >> >> On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> OK >>> >>> ts is the timestamp right? >>> >>> This is a similar code that works out the average temperature with time >>> frame of 5 minutes. >>> >>> Note the comments and catch error with try: >>> >>> try: >>> >>> # construct a streaming dataframe streamingDataFrame that >>> subscribes to topic temperature >>> streamingDataFrame = self.spark \ >>> .readStream \ >>> .format("kafka") \ >>> .option("kafka.bootstrap.servers", >>> config['MDVariables']['bootstrapServers'],) \ >>> .option("schema.registry.url", >>> config['MDVariables']['schemaRegistryURL']) \ >>> .option("group.id", config['common']['appName']) \ >>> .option("zookeeper.connection.timeout.ms", >>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>> .option("rebalance.backoff.ms", >>> config['MDVariables']['rebalanceBackoffMS']) \ >>> .option("zookeeper.session.timeout.ms", >>> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>> .option("auto.commit.interval.ms", >>> config['MDVariables']['autoCommitIntervalMS']) \ >>> .option("subscribe", "temperature") \ >>> .option("failOnDataLoss", "false") \ >>> .option("includeHeaders", "true") \ >>> .option("startingOffsets", "latest") \ >>> .load() \ >>> .select(from_json(col("value").cast("string"), >>> schema).alias("parsed_value")) >>> >>> >>> resultC = streamingDataFrame.select( \ >>> col("parsed_value.rowkey").alias("rowkey") \ >>> , col("parsed_value.timestamp").alias("timestamp") \ >>> , >>> col("parsed_value.temperature").alias("temperature")) >>> >>> """ >>> We work out the window and the AVG(temperature) in the >>> window's timeframe below >>> This should return back the following Dataframe as struct >>> >>> root >>> |-- window: struct (nullable = false) >>> | |-- start: timestamp (nullable = true) >>> | |-- end: timestamp (nullable = true) >>> |-- avg(temperature): double (nullable = true) >>> >>> """ >>> resultM = resultC. \ >>> withWatermark("timestamp", "5 minutes"). \ >>> groupBy(window(resultC.timestamp, "5 minutes", "5 >>> minutes")). \ >>> avg('temperature') >>> >>> # We take the above Dataframe and flatten it to get the >>> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and >>> "AVGTemperature" >>> resultMF = resultM. \ >>> select( \ >>> >>> F.col("window.start").alias("startOfWindowFrame") \ >>> , >>> F.col("window.end").alias("endOfWindowFrame") \ >>> , >>> F.col("avg(temperature)").alias("AVGTemperature")) >>> >>> resultMF.printSchema() >>> >>> result = resultMF. \ >>> writeStream. \ >>> outputMode('complete'). \ >>> option("numRows", 1000). \ >>> option("truncate", "false"). \ >>> format('console'). \ >>> option('checkpointLocation', checkpoint_path). \ >>> queryName("temperature"). \ >>> start() >>> >>> except Exception as e: >>> print(f"""{e}, quitting""") >>> sys.exit(1) >>> >>> >>> >>> HTH >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Sat, 11 Mar 2023 at 04:33, karan alang <karan.al...@gmail.com> wrote: >>> >>>> Hi Mich - >>>> Here is the output of the ldf.printSchema() & ldf.show() commands. >>>> >>>> ldf.printSchema() >>>> >>>> root >>>> |-- applianceName: string (nullable = true) >>>> |-- timeslot: long (nullable = true) >>>> |-- customer: string (nullable = true) >>>> |-- window: struct (nullable = false) >>>> | |-- start: timestamp (nullable = true) >>>> | |-- end: timestamp (nullable = true) >>>> |-- sentOctets: long (nullable = true) >>>> |-- recvdOctets: long (nullable = true) >>>> >>>> >>>> ldf.show() : >>>> >>>> >>>> >>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >>>> |applianceName |timeslot |customer|window >>>> |sentOctets|recvdOctets| >>>> >>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >>>> |abc1 |2797514 |cust1 |{2023-03-11 04:15:00, >>>> 2023-03-11 04:30:00}|21459264 |32211859 | >>>> |pqrq |2797513 |cust1 |{2023-03-11 04:15:00, >>>> 2023-03-11 04:30:00}|17775527 |31331093 | >>>> |xyz |2797514 |cust1 |{2023-03-11 04:15:00, >>>> 2023-03-11 04:30:00}|12808015 |24191707 | >>>> >>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >>>> >>>> Also, any comment on the outputMode ? I've set it to 'update', since >>>> I'm using aggregation. >>>> >>>> thanks! >>>> >>>> On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> >>>>> Just looking at the code >>>>> >>>>> >>>>> in here >>>>> >>>>> >>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer", >>>>> >>>>> window(col("ts"), "15 minutes")) \ >>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') >>>>> \ >>>>> .fillna(0) >>>>> >>>>> What does ldf.printSchema() returns >>>>> >>>>> >>>>> HTH >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com> >>>>> wrote: >>>>> >>>>>> >>>>>> Hello All - >>>>>> >>>>>> I've a structured Streaming job which has a trigger of 10 minutes, >>>>>> and I'm using watermark to account for late data coming in. However, the >>>>>> watermark is not working - and instead of a single record with total >>>>>> aggregated value, I see 2 records. >>>>>> >>>>>> Here is the code : >>>>>> >>>>>> ``` >>>>>> >>>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins >>>>>> >>>>>> >>>>>> df_stream = self.spark.readStream.format('kafka') \ >>>>>> .option("kafka.security.protocol", "SSL") \ >>>>>> .option("kafka.ssl.truststore.location", >>>>>> self.ssl_truststore_location) \ >>>>>> .option("kafka.ssl.truststore.password", >>>>>> self.ssl_truststore_password) \ >>>>>> .option("kafka.ssl.keystore.location", >>>>>> self.ssl_keystore_location_bandwidth_intermediate) \ >>>>>> .option("kafka.ssl.keystore.password", >>>>>> self.ssl_keystore_password_bandwidth_intermediate) \ >>>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >>>>>> .option("subscribe", topic) \ >>>>>> .option("startingOffsets", "latest") \ >>>>>> .option("failOnDataLoss", "false") \ >>>>>> .option("kafka.metadata.max.age.ms", "1000") \ >>>>>> .option("kafka.ssl.keystore.type", "PKCS12") \ >>>>>> .option("kafka.ssl.truststore.type", "PKCS12") \ >>>>>> .load() >>>>>> >>>>>> 2. calling foreachBatch(self.process) >>>>>> # note - outputMode is set to "update" (tried setting outputMode >>>>>> = append as well) >>>>>> >>>>>> # 03/09 ::: outputMode - update instead of append >>>>>> query = df_stream.selectExpr("CAST(value AS STRING)", >>>>>> "timestamp", "topic").writeStream \ >>>>>> .outputMode("update") \ >>>>>> .trigger(processingTime='10 minutes') \ >>>>>> .option("truncate", "false") \ >>>>>> .option("checkpointLocation", self.checkpoint) \ >>>>>> .foreachBatch(self.process) \ >>>>>> .start() >>>>>> >>>>>> >>>>>> self.process - where i do the bulk of the processing, which calls the >>>>>> function 'aggIntfLogs' >>>>>> >>>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing >>>>>> groupBy to calculate the sum of sentOctets & recvdOctets >>>>>> >>>>>> >>>>>> def aggIntfLogs(ldf): >>>>>> if ldf and ldf.count() > 0: >>>>>> >>>>>> ldf = ldf.select('applianceName', 'timeslot', >>>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \ >>>>>> .withColumn('sentOctets', >>>>>> ldf["sentOctets"].cast(LongType())) \ >>>>>> .withColumn('recvdOctets', >>>>>> ldf["recvdOctets"].cast(LongType())) \ >>>>>> .withWatermark("ts", "15 minutes") >>>>>> >>>>>> ldf = ldf.groupBy("applianceName", "timeslot", >>>>>> "customer", >>>>>> >>>>>> window(col("ts"), "15 minutes")) \ >>>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>>>>> .withColumnRenamed('sum(recvdOctets)', >>>>>> 'recvdOctets') \ >>>>>> .fillna(0) >>>>>> return ldf >>>>>> return ldf >>>>>> >>>>>> >>>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is >>>>>> written to Kafka topic >>>>>> >>>>>> ``` >>>>>> >>>>>> I was expecting that using the watermark will account for late coming >>>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the >>>>>> consolidated data >>>>>> (including late-coming data, since the late coming data comes within >>>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key - >>>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated >>>>>> individually for the records and I see 2 records instead of single record >>>>>> accounting for late coming data within watermark. >>>>>> >>>>>> What needs to be done to fix this & make this work as desired? >>>>>> >>>>>> tia! >>>>>> >>>>>> >>>>>> Here is the Stackoverflow link as well - >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >>>>>> >>>>>> >>>>>> >>>>>>