Fyi .. apache spark version is 3.1.3 On Wed, Mar 15, 2023 at 4:34 PM karan alang <karan.al...@gmail.com> wrote:
> Hi Mich, this doesn't seem to be working for me .. the watermark seems to > be getting ignored ! > > Here is the data put into Kafka : > > ``` > > > +---------------------------------------------------------------------------------------------------+----+ > > |value > |key | > > > +---------------------------------------------------------------------------------------------------+----+ > > > |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null| > > > |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null| > > > |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null| > > |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"} > |null| > > > +---------------------------------------------------------------------------------------------------+----+ > ``` > Note : > insert_ts - specifies when the data was inserted > > Here is the output of the Structured Stream: > > ------------------------------------------- > > Batch: 2 > > ------------------------------------------- > > +-------------------+-------------------+---------------+ > > |startOfWindowFrame |endOfWindowFrame |Sum_Temperature| > > +-------------------+-------------------+---------------+ > > |2023-03-15 16:10:00|2023-03-15 16:15:00|27 | > > |2023-03-15 15:10:00|2023-03-15 15:15:00|14 | > > |2023-03-13 10:10:00|2023-03-13 10:15:00|6 | > > +-------------------+-------------------+---------------+ > > Note: I'm summing up the temperatures (for easy verification) > > As per the above - all the 3 'ts' are included in the DataFrame, even when > I added "ts":"2023-03-13T10:12:00.000-07:00", as the last record. > Since the wattermark is set to "5 minutes" and the max(ts) == > 2023-03-15T16:12:00.000-07:00 > record with ts = "2023-03-13T10:12:00.000-07:00" should have got dropped, > it is more than 2 days old (i.e. dated - 2023-03-13)! > > Any ideas what needs to be changed to make this work ? > > Here is the code (modified for my requirement, but essentially the same) > ``` > > schema = StructType([ > StructField("temparature", LongType(), False), > StructField("ts", TimestampType(), False), > StructField("insert_ts", TimestampType(), False) > ]) > > streamingDataFrame = spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", kafkaBrokers) \ > .option("group.id", 'watermark-grp') \ > .option("subscribe", topic) \ > .option("failOnDataLoss", "false") \ > .option("includeHeaders", "true") \ > .option("startingOffsets", "latest") \ > .load() \ > .select(from_json(col("value").cast("string"), > schema=schema).alias("parsed_value")) > > resultC = streamingDataFrame.select( > col("parsed_value.ts").alias("timestamp") \ > , col("parsed_value.temparature").alias("temparature"), > col("parsed_value.insert_ts").alias("insert_ts")) > > resultM = resultC. \ > withWatermark("timestamp", "5 minutes"). \ > groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \ > agg({'temparature':'sum'}) > > resultMF = resultM. \ > > select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame") > \ > , col("sum(temparature)").alias("Sum_Temperature")) > > result = resultMF. \ > writeStream. \ > outputMode('complete'). \ > option("numRows", 1000). \ > option("truncate", "false"). \ > format('console'). \ > option('checkpointLocation', checkpoint_path). \ > queryName("sum_temparature"). \ > start() > > result.awaitTermination() > > ``` > > > > On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> OK >> >> ts is the timestamp right? >> >> This is a similar code that works out the average temperature with time >> frame of 5 minutes. >> >> Note the comments and catch error with try: >> >> try: >> >> # construct a streaming dataframe streamingDataFrame that >> subscribes to topic temperature >> streamingDataFrame = self.spark \ >> .readStream \ >> .format("kafka") \ >> .option("kafka.bootstrap.servers", >> config['MDVariables']['bootstrapServers'],) \ >> .option("schema.registry.url", >> config['MDVariables']['schemaRegistryURL']) \ >> .option("group.id", config['common']['appName']) \ >> .option("zookeeper.connection.timeout.ms", >> config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >> .option("rebalance.backoff.ms", >> config['MDVariables']['rebalanceBackoffMS']) \ >> .option("zookeeper.session.timeout.ms", >> config['MDVariables']['zookeeperSessionTimeOutMs']) \ >> .option("auto.commit.interval.ms", >> config['MDVariables']['autoCommitIntervalMS']) \ >> .option("subscribe", "temperature") \ >> .option("failOnDataLoss", "false") \ >> .option("includeHeaders", "true") \ >> .option("startingOffsets", "latest") \ >> .load() \ >> .select(from_json(col("value").cast("string"), >> schema).alias("parsed_value")) >> >> >> resultC = streamingDataFrame.select( \ >> col("parsed_value.rowkey").alias("rowkey") \ >> , col("parsed_value.timestamp").alias("timestamp") \ >> , col("parsed_value.temperature").alias("temperature")) >> >> """ >> We work out the window and the AVG(temperature) in the >> window's timeframe below >> This should return back the following Dataframe as struct >> >> root >> |-- window: struct (nullable = false) >> | |-- start: timestamp (nullable = true) >> | |-- end: timestamp (nullable = true) >> |-- avg(temperature): double (nullable = true) >> >> """ >> resultM = resultC. \ >> withWatermark("timestamp", "5 minutes"). \ >> groupBy(window(resultC.timestamp, "5 minutes", "5 >> minutes")). \ >> avg('temperature') >> >> # We take the above Dataframe and flatten it to get the >> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and >> "AVGTemperature" >> resultMF = resultM. \ >> select( \ >> >> F.col("window.start").alias("startOfWindowFrame") \ >> , F.col("window.end").alias("endOfWindowFrame") >> \ >> , >> F.col("avg(temperature)").alias("AVGTemperature")) >> >> resultMF.printSchema() >> >> result = resultMF. \ >> writeStream. \ >> outputMode('complete'). \ >> option("numRows", 1000). \ >> option("truncate", "false"). \ >> format('console'). \ >> option('checkpointLocation', checkpoint_path). \ >> queryName("temperature"). \ >> start() >> >> except Exception as e: >> print(f"""{e}, quitting""") >> sys.exit(1) >> >> >> >> HTH >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Sat, 11 Mar 2023 at 04:33, karan alang <karan.al...@gmail.com> wrote: >> >>> Hi Mich - >>> Here is the output of the ldf.printSchema() & ldf.show() commands. >>> >>> ldf.printSchema() >>> >>> root >>> |-- applianceName: string (nullable = true) >>> |-- timeslot: long (nullable = true) >>> |-- customer: string (nullable = true) >>> |-- window: struct (nullable = false) >>> | |-- start: timestamp (nullable = true) >>> | |-- end: timestamp (nullable = true) >>> |-- sentOctets: long (nullable = true) >>> |-- recvdOctets: long (nullable = true) >>> >>> >>> ldf.show() : >>> >>> >>> >>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >>> |applianceName |timeslot |customer|window >>> |sentOctets|recvdOctets| >>> >>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >>> |abc1 |2797514 |cust1 |{2023-03-11 04:15:00, >>> 2023-03-11 04:30:00}|21459264 |32211859 | >>> |pqrq |2797513 |cust1 |{2023-03-11 04:15:00, >>> 2023-03-11 04:30:00}|17775527 |31331093 | >>> |xyz |2797514 |cust1 |{2023-03-11 04:15:00, >>> 2023-03-11 04:30:00}|12808015 |24191707 | >>> >>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >>> >>> Also, any comment on the outputMode ? I've set it to 'update', since I'm >>> using aggregation. >>> >>> thanks! >>> >>> On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> >>>> Just looking at the code >>>> >>>> >>>> in here >>>> >>>> >>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer", >>>> >>>> window(col("ts"), "15 minutes")) \ >>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \ >>>> .fillna(0) >>>> >>>> What does ldf.printSchema() returns >>>> >>>> >>>> HTH >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> Hello All - >>>>> >>>>> I've a structured Streaming job which has a trigger of 10 minutes, and >>>>> I'm using watermark to account for late data coming in. However, the >>>>> watermark is not working - and instead of a single record with total >>>>> aggregated value, I see 2 records. >>>>> >>>>> Here is the code : >>>>> >>>>> ``` >>>>> >>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins >>>>> >>>>> >>>>> df_stream = self.spark.readStream.format('kafka') \ >>>>> .option("kafka.security.protocol", "SSL") \ >>>>> .option("kafka.ssl.truststore.location", >>>>> self.ssl_truststore_location) \ >>>>> .option("kafka.ssl.truststore.password", >>>>> self.ssl_truststore_password) \ >>>>> .option("kafka.ssl.keystore.location", >>>>> self.ssl_keystore_location_bandwidth_intermediate) \ >>>>> .option("kafka.ssl.keystore.password", >>>>> self.ssl_keystore_password_bandwidth_intermediate) \ >>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >>>>> .option("subscribe", topic) \ >>>>> .option("startingOffsets", "latest") \ >>>>> .option("failOnDataLoss", "false") \ >>>>> .option("kafka.metadata.max.age.ms", "1000") \ >>>>> .option("kafka.ssl.keystore.type", "PKCS12") \ >>>>> .option("kafka.ssl.truststore.type", "PKCS12") \ >>>>> .load() >>>>> >>>>> 2. calling foreachBatch(self.process) >>>>> # note - outputMode is set to "update" (tried setting outputMode >>>>> = append as well) >>>>> >>>>> # 03/09 ::: outputMode - update instead of append >>>>> query = df_stream.selectExpr("CAST(value AS STRING)", >>>>> "timestamp", "topic").writeStream \ >>>>> .outputMode("update") \ >>>>> .trigger(processingTime='10 minutes') \ >>>>> .option("truncate", "false") \ >>>>> .option("checkpointLocation", self.checkpoint) \ >>>>> .foreachBatch(self.process) \ >>>>> .start() >>>>> >>>>> >>>>> self.process - where i do the bulk of the processing, which calls the >>>>> function 'aggIntfLogs' >>>>> >>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing >>>>> groupBy to calculate the sum of sentOctets & recvdOctets >>>>> >>>>> >>>>> def aggIntfLogs(ldf): >>>>> if ldf and ldf.count() > 0: >>>>> >>>>> ldf = ldf.select('applianceName', 'timeslot', >>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \ >>>>> .withColumn('sentOctets', >>>>> ldf["sentOctets"].cast(LongType())) \ >>>>> .withColumn('recvdOctets', >>>>> ldf["recvdOctets"].cast(LongType())) \ >>>>> .withWatermark("ts", "15 minutes") >>>>> >>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer", >>>>> >>>>> window(col("ts"), "15 minutes")) \ >>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') >>>>> \ >>>>> .fillna(0) >>>>> return ldf >>>>> return ldf >>>>> >>>>> >>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is >>>>> written to Kafka topic >>>>> >>>>> ``` >>>>> >>>>> I was expecting that using the watermark will account for late coming >>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the >>>>> consolidated data >>>>> (including late-coming data, since the late coming data comes within >>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key - >>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated >>>>> individually for the records and I see 2 records instead of single record >>>>> accounting for late coming data within watermark. >>>>> >>>>> What needs to be done to fix this & make this work as desired? >>>>> >>>>> tia! >>>>> >>>>> >>>>> Here is the Stackoverflow link as well - >>>>> >>>>> >>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >>>>> >>>>> >>>>> >>>>>