Scott Schenkein created SPARK-48023: ---------------------------------------
Summary: Stream-stream join with windowed drop-duplicates suppresses all rows Key: SPARK-48023 URL: https://issues.apache.org/jira/browse/SPARK-48023 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.5.0 Reporter: Scott Schenkein When applying a streaming dropDuplicates-with-watermark to a self-referential stream-stream left-join with watermark, the dropDuplicates drops all rows. If the watermark is eliminated from the dropDuplicates, the query behaves as expected. The code below demonstrates the error: {code:java} # # 1. Generate the test data # size = 1000 step_secs = 300 event_offset_secs = 240 # Add some lines to not left join skips = set() skips.add(3) skips.add(18) skips.add(800) def lit_ts(secs): return datetime.datetime.fromtimestamp(secs) data = [] base_time = time.time() for x in range(size): ts = base_time + (step_secs * x) data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)}) if x not in skips: data.append( { "event_id": f"two_{x}", "join_id": x, "ts": lit_ts(ts - event_offset_secs), } ) # Add duplicates to validate the dropDuplicates for i in range(len(data)): data.append(data[i].copy()) # # 2. Write the results so we can stream # path = "/tmp/bugtest" df = spark.createDataFrame(data, schema="event_id string, join_id string, ts TIMESTAMP") df.repartition(1).write.format("delta").mode("overwrite").save(path) df = spark.read.format("delta").load(path) df.createOrReplaceTempView("test_data") # # 3. Define the test query # sql = """ with also_test_data as ( select * from test_data where event_id like 'two%' ) select td.* from test_data td left join also_test_data on ( td.join_id = also_test_data.join_id and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES and also_test_data.ts <= td.ts ) where td.event_id like 'one%' and also_test_data.event_id is NULL """ # # 4. Run it non-streaming w/non-deDuplicated to validate results # res = spark.sql(sql) print("Static query") res.show(truncate=False) # +--------+-------+-------------------------+ # |event_id|join_id|ts | # +--------+-------+-------------------------+ # |one_3 |3 |2024-04-27 17:36:54.97927| # |one_18 |18 |2024-04-27 18:51:54.97927| # |one_800 |800 |2024-04-30 12:01:54.97927| # |one_3 |3 |2024-04-27 17:36:54.97927| # |one_18 |18 |2024-04-27 18:51:54.97927| # |one_800 |800 |2024-04-30 12:01:54.97927| # +--------+-------+-------------------------+ # # 5. Run it as a stream with no-dropDuplicates # def write_stream(res): ( res.writeStream.outputMode("append") .trigger(availableNow=True) .format("console") .start() .awaitTermination() ) sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 minutes') sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) write_stream(res) # Batch: 1 # ------------------------------------------- # +--------+-------+--------------------+ # |event_id|join_id| ts| # +--------+-------+--------------------+ # | one_800| 800|2024-04-30 12:01:...| # | one_800| 800|2024-04-30 12:01:...| # | one_3| 3|2024-04-27 17:36:...| # | one_3| 3|2024-04-27 17:36:...| # | one_18| 18|2024-04-27 18:51:...| # | one_18| 18|2024-04-27 18:51:...| # +--------+-------+--------------------+ # # 6. Run it as a stream with dropDuplicates, but no extra watermark # sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 minutes") sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) res = ( (res.select(window("ts", "30 minutes"), "*")) .dropDuplicates(["event_id"]).drop("window") ) write_stream(res) # ------------------------------------------- # Batch: 1 # ------------------------------------------- # +--------+-------+--------------------+ # |event_id|join_id| ts| # +--------+-------+--------------------+ # | one_800| 800|2024-04-30 12:01:...| # | one_18| 18|2024-04-27 18:51:...| # | one_3| 3|2024-04-27 17:36:...| # +--------+-------+--------------------+ # # 7. Run it as a stream with dropDuplicates, using a watermark # THIS is where we see the error # sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 minutes") sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) res = ( (res.select(window("ts", "30 minutes"), "*")) .withWatermark("window", '30 minutes') .dropDuplicates(["event_id"]).drop("window") ) write_stream(res) print("END") # ------------------------------------------- # Batch: 0 # ------------------------------------------- # +--------+-------+---+------------+ # |event_id|join_id| ts|detection_ts| # +--------+-------+---+------------+ # +--------+-------+---+------------+ # END{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org