Scott Schenkein created SPARK-48023:
---------------------------------------

             Summary: Stream-stream join with windowed drop-duplicates 
suppresses all rows
                 Key: SPARK-48023
                 URL: https://issues.apache.org/jira/browse/SPARK-48023
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.5.0
            Reporter: Scott Schenkein


When applying a streaming dropDuplicates-with-watermark to  a self-referential 
stream-stream left-join with watermark, the dropDuplicates drops all rows.  If 
the watermark is eliminated from the dropDuplicates, the query behaves as 
expected.

 

The code below demonstrates the error:

 
{code:java}
# 
# 1. Generate the test data
#
size = 1000
step_secs = 300
event_offset_secs = 240
# Add some lines to not left join
skips = set()
skips.add(3)
skips.add(18)
skips.add(800)

def lit_ts(secs):
    return datetime.datetime.fromtimestamp(secs)

data = []
base_time = time.time()
for x in range(size):
    ts = base_time + (step_secs * x)
    data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
    if x not in skips:
        data.append(
            {
                "event_id": f"two_{x}",
                "join_id": x,
                "ts": lit_ts(ts - event_offset_secs),
            }
        )

# Add duplicates to validate the dropDuplicates
for i in range(len(data)):
   data.append(data[i].copy()) 

#
# 2. Write the results so we can stream
#
path = "/tmp/bugtest"
df = spark.createDataFrame(data, schema="event_id string, join_id string, ts 
TIMESTAMP")
df.repartition(1).write.format("delta").mode("overwrite").save(path)


df = spark.read.format("delta").load(path)
df.createOrReplaceTempView("test_data")
#
# 3. Define the test query
#
sql = """
with also_test_data as (
   select * from test_data
   where event_id like 'two%'
)
select td.* from test_data
td left join also_test_data on (
   td.join_id = also_test_data.join_id
   and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
   and also_test_data.ts <= td.ts
)
where td.event_id like 'one%'
and also_test_data.event_id is NULL
"""
#
# 4. Run it non-streaming w/non-deDuplicated to validate results
#
res = spark.sql(sql)
print("Static query")
res.show(truncate=False)
# +--------+-------+-------------------------+
# |event_id|join_id|ts                       |
# +--------+-------+-------------------------+
# |one_3   |3      |2024-04-27 17:36:54.97927|
# |one_18  |18     |2024-04-27 18:51:54.97927|
# |one_800 |800    |2024-04-30 12:01:54.97927|
# |one_3   |3      |2024-04-27 17:36:54.97927|
# |one_18  |18     |2024-04-27 18:51:54.97927|
# |one_800 |800    |2024-04-30 12:01:54.97927|
# +--------+-------+-------------------------+


#
# 5. Run it as a stream with no-dropDuplicates
#
def write_stream(res):
    (   
        res.writeStream.outputMode("append")
        .trigger(availableNow=True)
        .format("console")
        .start()
        .awaitTermination()
    )
sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 
minutes')
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
write_stream(res)
# Batch: 1
# -------------------------------------------
# +--------+-------+--------------------+
# |event_id|join_id|                  ts|
# +--------+-------+--------------------+
# | one_800|    800|2024-04-30 12:01:...|
# | one_800|    800|2024-04-30 12:01:...|
# |   one_3|      3|2024-04-27 17:36:...|
# |   one_3|      3|2024-04-27 17:36:...|
# |  one_18|     18|2024-04-27 18:51:...|
# |  one_18|     18|2024-04-27 18:51:...|
# +--------+-------+--------------------+

#
# 6. Run it as a stream with dropDuplicates, but no extra watermark
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
    (res.select(window("ts", "30 minutes"), "*"))
    .dropDuplicates(["event_id"]).drop("window")
)
write_stream(res)
# -------------------------------------------                                 
# Batch: 1
# -------------------------------------------
# +--------+-------+--------------------+
# |event_id|join_id|                  ts|
# +--------+-------+--------------------+
# | one_800|    800|2024-04-30 12:01:...|
# |  one_18|     18|2024-04-27 18:51:...|
# |   one_3|      3|2024-04-27 17:36:...|
# +--------+-------+--------------------+

#
# 7. Run it as a stream with dropDuplicates, using a watermark
#     THIS is where we see the error
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
    (res.select(window("ts", "30 minutes"), "*"))
    .withWatermark("window", '30 minutes')
    .dropDuplicates(["event_id"]).drop("window")
)
write_stream(res) 
print("END")
# -------------------------------------------                                  
# Batch: 0
# -------------------------------------------
# +--------+-------+---+------------+
# |event_id|join_id| ts|detection_ts|
# +--------+-------+---+------------+
# +--------+-------+---+------------+
# END{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to