You can check the plan after optimize to verify it's a regular join or
time-bounded join(Should have a WindowJoin). The most direct way is
breakpoint at optimizing phase [1][2].
And you can use your TestData and create an ITCase for debugging [3]


[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala#L148
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/StreamOptimizer.scala#L68
[3]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala

*Best Regards,*
*Zhenghua Gao*


On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi there,
>
> Currently, I'm trying to write a SQL query which shall executed a time
> windowed/bounded JOIN on two data streams.
>
> Suppose I have stream1 with attribute id, ts, user and stream2 with
> attribute id, ts, userName. I want to receive the natural JOIN of both
> streams with events of the same day.
>
> In Oracle (With a ts column as number instead of Timestamp, for historical
> reasons), I do the following:
>
> SELECT *
>   FROM STREAM1
>   JOIN STREAM2 ON STREAM1."user" = STREAM2."userName"
>                 AND TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 
> 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 
> 24 / 60 / 60 / 1000 ) * STREAM2."ts");
>
> which yields 294 rows with my test data (14 elements from stream1 match to
> 21 elements in stream2 on the one day of test data). Now I want to query
> the same in Flink. So I registered both streams as table and properly
> registered the even-time (by specifying ts.rowtime as table column).
>
> My goal is to produce a time-windowed JOIN so that, if both streams
> advance their watermark far enough, an element is written out into an
> append only stream.
>
> First try (to conform time-bounded-JOIN conditions):
>
> SELECT s1.id, s2.id
>   FROM STREAM1 AS s1
>   JOIN STREAM2 AS s2
>     ON s1.`user` = s2.userName
>        AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' 
> HOUR
>        AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' 
> HOUR
>        AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, 
> INTERVAL'1' DAY) -- Reduce to matchings on the same day.
>
> This yielded in the exception "Rowtime attributes must not be in the input
> rows of a regular join. As a workaround you can cast the time attributes of
> input tables to TIMESTAMP before.". So I'm still in the area of regular
> joins, not time-windowed JOINs, even though I made the explicit BETWEEN for
> both input streams!
>
> Then I found [1], which really is my query but without the last condition
> (reduce to matching on the same day). I tried this one as well, just to
> have a starting point, but the error is the same.
> I then reduced the Condition to just one time bound:
>
> SELECT s1.id, s2.id
>   FROM STREAM1 AS s1
>   JOIN STREAM2 AS s2
>     ON s1.`user` = s2.userName
>        AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' 
> HOUR
>
> which runs as a query but doesn't produce any results. Most likely because
> Flink still thinks of a regular join instead of a time-window JOIN and
> doesn't emit any resutls. (FYI interest, after executing the query, I
> convert the Table back to a stream via tEnv.toAppendStream and I use Flink
> 1.8.0 for tests).
>
> My questions are now:
> 1. How do I see if Flink treats my table result as a regular JOIN result
> or a time-bounded JOIN?
> 2. What is the proper way to formulate my initial query, finding all
> matching events within the same tumbling window?
>
> Best regards
> Theo Diefenthal
>
> [1]
> https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183
> Slide 18
>

Reply via email to