Hello, I'm a beginner in Flink and after trying to solve my problems for several days i decided to ask in the list.
My goal is to connect two kafka topics which have a common ID field then produce the enriched object to a third topic based on a Tumble Window because the result has to be applied in a database. Im struggling on the following problems: 1. As I'm using pyflink I found that the StreamTableEnvironment.connect method is listed as obsolete. That's why I decided to stick with SQL but i am not sure what is the equivalent of the connect. 2. Working on the 1st problem I created two dynamic source tables using https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/maxwell.html because the source stream is Maxwell CDC. Here is the DDL: CREATE TABLE device( `id` VARCHAR, `type` VARCHAR, `u_ts` BIGINT, row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(u_ts)), WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL '5' SECOND, PRIMARY KEY (id, type) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'format' = 'maxwell-json', 'topic' = 'device', 'properties.bootstrap.servers' = 'localhost:9092', 'scan.startup.mode' = 'latest-offset' ) The second source is the same format. Everything is working fine until I try any type of join. For example the Interval Join leads to: > *pyflink.util.exceptions.TableException: IntervalJoin doesn't support > consuming update and delete changes which is produced by node > ChangelogNormalize(key=[id, typ])* How can I use the final applied state of the records in a join? 3. As Tumble window does not support joins how can I keep the dynamic tables updated but join them on a regular interval(for ex. TUMBLE_START(row_ts, INTERVAL '1' HOUR)) and produce(INSERT) in the sink topic only the changed records? The two source topics are from one dimensional(slowly updated) and one fact tables where I have to be able to join outside the watermark interval because a record from the dim table could be last updated days ago. Thank you very much in advance!
