Hello,
I'm seeing some strange behaviour in Flink SQL where adding a new SELECT
statement causes a previously created Interval Join to be changed into a
regular Join. I'm concerned because the Flink docs make clear that regular
Joins are not safe because their memory usage can grow indefinitely.
I have put a worked example in https://github.com/mnuttall/flink-debug. I
have an interval join,
CREATE TEMPORARY VIEW suspiciousOrders AS
SELECT s.orderId, s.customer, s.product, s.quantity AS order_quantity,
l.cancel_quantity, l.order_ts AS large_ts, s.ts as small_ts, l.cancel_ts
FROM smallOrders s JOIN largeCancellations l
ON s.product = l.product AND s.customer = l.customer
WHERE s.ts BETWEEN l.cancel_ts - interval '1' day AND l.cancel_ts;
which evaluates to
[13]:IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-8640, leftUpperBound=0, leftTimeIndex=0,
rightTimeIndex=1], where=[((product = product0) AND (customer = customer0)
AND (ts = (cancel_ts - 8640:INTERVAL DAY)) AND (ts =
cancel_ts))], select=[ts, orderId, customer, product, quantity, order_ts,
cancel_ts, product0, customer0, cancel_quantity])
+- [14]:Calc(select=[orderId, customer, product, quantity AS
order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts,
cancel_ts])
+- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity,
cancel_quantity])]
+- Sink: Collect table sink
but adding a further temporary view
CREATE TEMPORARY VIEW filteredResults AS
SELECT * from suspiciousOrders WHERE small_ts > large_ts;
changes the interval join to a regular join,
[13]:Join(joinType=[InnerJoin], where=[((product = product0) AND (customer
= customer0) AND (ts = (cancel_ts - 8640:INTERVAL DAY)) AND (ts
= cancel_ts) AND (ts order_ts))], select=[ts, orderId, customer,
product, quantity, order_ts, cancel_ts, product0, customer0,
cancel_quantity], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+- [14]:Calc(select=[orderId, customer, product, quantity AS
order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts,
cancel_ts])
+- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity,
cancel_quantity])]
+- Sink: Collect table sink
Please can someone explain what's happening here? It looks as though my
(safe) interval join is being converted to an (unsafe) regular join - is
that true?
Many thanks in advance.
Regards,
Mark Nuttall