Hello all!

I'm trying to design a stream pipeline, and have trouble controlling when a
JOIN is triggering an update:

Setup:

   - The Event table; "probe side", "query side", the result of earlier
   stream processing
   - The DimensionAtJoinTimeX tables; of updating nature, "build side", the
   results of earlier stream processing

Joining them:

SELECT    *
FROM      Event e
LEFT JOIN DimensionAtJoinTime1 d1
  ON      e.uid = d1.uid
LEFT JOIN DimensionAtJoinTime2 d2
  ON      e.uid = d2.uid

The DimensionAtJoinTimeX Tables being the result of earlier stream
processing, possibly from the same Event table:

SELECT   uid,
         hop_start(...),
         sum(...)
FROM     Event e
GROUP BY uid,
         hop(...)

The Event Table being:

SELECT ...
FROM   EventRawInput i
WHERE  i.some_field = 'some_value'

Requirements:

   - I need the JOINs to only be executed once, only when a new line is
   appended to the probe / query / Event table.
   - I also need the full pipeline to be defined in SQL.
   - I very strongly prefer the Blink planner (mainly for Deduplication,
   TopN and LAST_VALUE features).

Problem exploration so far:

   - Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution in
   SQL: it doesn't work out. But I might explore the following: insert
   DimensionAtJoinTimeX into a special Sink, and use it in a
   LookupableTableSource (I'm at a loss on how to do that, though. Do I need
   an external kv store?).
   - Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a version
   of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have missed
   something in the documentation.
   - Option 3, "LATERAL TABLE table_function" [2], on the Legacy planner:
   It does not work with two tables [3], and I don't get to have the Blink
   planner features.
   - Option 4, "LATERAL TABLE table_function" [2], on the Blink planner: It
   does not work with the "probe side" being the results of earlier stream
   processing [4].
   - Option 5, let a regular JOIN materialize the updates, and somehow find
   how to filter the ones coming from the build sides (I'm at a loss on how to
   do that).
   - Option 6, "TVR": I read this paper [5], which mentions "Time-Varying
   Relation"s; Speculating here: could there be a way, to say that the build
   side is not a TVR. Aka declare the stream as being somehow "static", while
   still being updated (but I guess we're back to "FOR SYSTEM_TIME AS OF").
   - Option 7: Is there some features being developed, or hints, or
   workarounds to control the JOIN updates that I have not considered so far?
   - Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the same
   bug nature, even though they occur in different situations on different
   planners (same Exception Stack Trace on files that have the same historical
   parent before the Blink fork). FLINK-15112 has a workaround, but
   FLINK-14200 does not. The existence of that workaround IMHO signals that
   there is a simple fix for both bugs. I have tried to find it in Flink for a
   few days, but no success so far. If you guys have pointers helping me
   provide a fix, I'll gladly listen. So far I have progressed to: It revolves
   around Calcite-based Flink streaming rules transforming a temporal table
   function correlate into a Join on 2*Scan, and crashes when it encounters
   something that is not a table that can be readily scanned. Also, there are
   shenanigans on trying to find the right schema in the Catalog. But I am
   blocked now, and not accustomed to the Flink internal code (would like to
   though, if Alibaba/Ververica are recruiting remote workers, wink wink,
   nudge nudge).

All opinions very much welcomed on all Options and Remarks!

Cheers, and a happy new year to all,
Benoît

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#processing-time-temporal-joins

[3] https://issues.apache.org/jira/browse/FLINK-15112

[4] https://issues.apache.org/jira/browse/FLINK-14200

[5] https://arxiv.org/pdf/1905.12133.pdf

Reply via email to