You can check the plan after optimize to verify it's a regular join or time-bounded join(Should have a WindowJoin). The most direct way is breakpoint at optimizing phase [1][2]. And you can use your TestData and create an ITCase for debugging [3]
[1] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala#L148 [2] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/StreamOptimizer.scala#L68 [3] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala *Best Regards,* *Zhenghua Gao* On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal < theo.diefent...@scoop-software.de> wrote: > Hi there, > > Currently, I'm trying to write a SQL query which shall executed a time > windowed/bounded JOIN on two data streams. > > Suppose I have stream1 with attribute id, ts, user and stream2 with > attribute id, ts, userName. I want to receive the natural JOIN of both > streams with events of the same day. > > In Oracle (With a ts column as number instead of Timestamp, for historical > reasons), I do the following: > > SELECT * > FROM STREAM1 > JOIN STREAM2 ON STREAM1."user" = STREAM2."userName" > AND TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / > 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / > 24 / 60 / 60 / 1000 ) * STREAM2."ts"); > > which yields 294 rows with my test data (14 elements from stream1 match to > 21 elements in stream2 on the one day of test data). Now I want to query > the same in Flink. So I registered both streams as table and properly > registered the even-time (by specifying ts.rowtime as table column). > > My goal is to produce a time-windowed JOIN so that, if both streams > advance their watermark far enough, an element is written out into an > append only stream. > > First try (to conform time-bounded-JOIN conditions): > > SELECT s1.id, s2.id > FROM STREAM1 AS s1 > JOIN STREAM2 AS s2 > ON s1.`user` = s2.userName > AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' > HOUR > AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' > HOUR > AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, > INTERVAL'1' DAY) -- Reduce to matchings on the same day. > > This yielded in the exception "Rowtime attributes must not be in the input > rows of a regular join. As a workaround you can cast the time attributes of > input tables to TIMESTAMP before.". So I'm still in the area of regular > joins, not time-windowed JOINs, even though I made the explicit BETWEEN for > both input streams! > > Then I found [1], which really is my query but without the last condition > (reduce to matching on the same day). I tried this one as well, just to > have a starting point, but the error is the same. > I then reduced the Condition to just one time bound: > > SELECT s1.id, s2.id > FROM STREAM1 AS s1 > JOIN STREAM2 AS s2 > ON s1.`user` = s2.userName > AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' > HOUR > > which runs as a query but doesn't produce any results. Most likely because > Flink still thinks of a regular join instead of a time-window JOIN and > doesn't emit any resutls. (FYI interest, after executing the query, I > convert the Table back to a stream via tEnv.toAppendStream and I use Flink > 1.8.0 for tests). > > My questions are now: > 1. How do I see if Flink treats my table result as a regular JOIN result > or a time-bounded JOIN? > 2. What is the proper way to formulate my initial query, finding all > matching events within the same tumbling window? > > Best regards > Theo Diefenthal > > [1] > https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183 > Slide 18 >