Hi,

I am doing a simple POC using Flink SQL and I am facing some issues with
Interval Join.

*Use Case*: I have two Kafka streams and using Flink SQL interval join I
want to remove rows from* stream 1*(abandoned_user_visits) that are present
in *stream 2*(orders) within some time interval.

*Data:*
1) *Abandoned user visits.* Sample data:
{"key1": "123", "email": "ema...@example.com", "abandoned_pids":
[674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"}
{"key1": "234", "email": "ema...@example.com", "abandoned_pids":
[1942367711], "ts": "2021-03-18 11:45:00.208"}
{"key1": "123", "email": "ema...@example.com", "abandoned_pids":
[1754171520], "ts": "2021-03-18 12:00:00.208"}
{"key1": "234", "email": "ema...@example.com", "abandoned_pids":
[1942367711], "ts": "2021-03-18 12:45:00.208"}

2) *User order stream*
{"key1": "234", "email": "ema...@example.com", "pids": [1754171520], "ts":
"2021-03-18 11:55:00.208"}
{"key1": "123", "email": "ema...@example.com", "pids": [674378611,
1754171520], "ts": "2021-03-18 12:10:00.208"}

When I try to push the above records to Kafka and select from the below
VIEW. I get the result that is actually an *INNER* join(not OUTER join).
I even tried posting just one record to stream(1) and no record to stream
(2), expecting that that record should be emitted. But nothing was emitted.
What was interesting is when I use the processing time instead of event
time, I get the results as expected.

*Tables and Views used: *
CREATE TABLE abandoned_visits (
        key1 STRING
      , email STRING
      , ts TIMESTAMP(3)
      , abandoned_pids ARRAY<BIGINT>
      , WATERMARK FOR ts AS ts
)
WITH (
  'connector' = 'kafka',
  'topic' = 'abandoned-visits',
  'properties.bootstrap.servers' = '...',
  'format' = 'json'
);

CREATE TABLE orders (
        key1 STRING
      , email STRING
      , ts TIMESTAMP(3)
      , pids ARRAY<BIGINT>
      , WATERMARK FOR ts AS ts
)
WITH (
    'connector' = 'kafka',
    'topic'     = 'orders',
    'properties.bootstrap.servers' = '...',
    'format'    = 'json'
);

CREATE VIEW abandoned_visits_with_no_orders AS
  SELECT
      av.key1
    , av.email
    , av.abandoned_pids
    , FLOOR(av.ts TO MINUTE)    AS visit_timestamp
    , FLOOR(o.ts TO MINUTE)     AS order_timestamp
    , o.email                   AS order_email
  FROM abandoned_visits av
  FULL OUTER JOIN orders o
  ON  av.key1 = o.key1
  AND av.email = o.email
  AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL '30'
MINUTE
--  WHERE
--    o.email IS NULL                                  // Commented this
out so as to get something in result
;

*Result: *
select * from abandoned_visits_with_no_orders;

This gives a result the same as an inner join. It doesn't have rows with
NULL order data.
I would appreciate any help.

Thanks,
Aneesha

Reply via email to