Hi, I am doing a simple POC using Flink SQL and I am facing some issues with Interval Join.
*Use Case*: I have two Kafka streams and using Flink SQL interval join I want to remove rows from* stream 1*(abandoned_user_visits) that are present in *stream 2*(orders) within some time interval. *Data:* 1) *Abandoned user visits.* Sample data: {"key1": "123", "email": "ema...@example.com", "abandoned_pids": [674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"} {"key1": "234", "email": "ema...@example.com", "abandoned_pids": [1942367711], "ts": "2021-03-18 11:45:00.208"} {"key1": "123", "email": "ema...@example.com", "abandoned_pids": [1754171520], "ts": "2021-03-18 12:00:00.208"} {"key1": "234", "email": "ema...@example.com", "abandoned_pids": [1942367711], "ts": "2021-03-18 12:45:00.208"} 2) *User order stream* {"key1": "234", "email": "ema...@example.com", "pids": [1754171520], "ts": "2021-03-18 11:55:00.208"} {"key1": "123", "email": "ema...@example.com", "pids": [674378611, 1754171520], "ts": "2021-03-18 12:10:00.208"} When I try to push the above records to Kafka and select from the below VIEW. I get the result that is actually an *INNER* join(not OUTER join). I even tried posting just one record to stream(1) and no record to stream (2), expecting that that record should be emitted. But nothing was emitted. What was interesting is when I use the processing time instead of event time, I get the results as expected. *Tables and Views used: * CREATE TABLE abandoned_visits ( key1 STRING , email STRING , ts TIMESTAMP(3) , abandoned_pids ARRAY<BIGINT> , WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'kafka', 'topic' = 'abandoned-visits', 'properties.bootstrap.servers' = '...', 'format' = 'json' ); CREATE TABLE orders ( key1 STRING , email STRING , ts TIMESTAMP(3) , pids ARRAY<BIGINT> , WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = '...', 'format' = 'json' ); CREATE VIEW abandoned_visits_with_no_orders AS SELECT av.key1 , av.email , av.abandoned_pids , FLOOR(av.ts TO MINUTE) AS visit_timestamp , FLOOR(o.ts TO MINUTE) AS order_timestamp , o.email AS order_email FROM abandoned_visits av FULL OUTER JOIN orders o ON av.key1 = o.key1 AND av.email = o.email AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL '30' MINUTE -- WHERE -- o.email IS NULL // Commented this out so as to get something in result ; *Result: * select * from abandoned_visits_with_no_orders; This gives a result the same as an inner join. It doesn't have rows with NULL order data. I would appreciate any help. Thanks, Aneesha