Hello,
I'm a beginner in Flink and after trying to solve my problems for several
days i decided to ask in the list.

My goal is to connect two kafka topics which have a common ID field then
produce the enriched object to a third topic based on a Tumble Window
because the result has to be applied in a database. Im struggling on the
following problems:

1. As I'm using pyflink I found that the StreamTableEnvironment.connect method
is listed as obsolete. That's why I decided to stick with SQL but i am not
sure what is the equivalent of the connect.
2. Working on the 1st problem I created two dynamic source tables using
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/maxwell.html
because the source stream is Maxwell CDC. Here is the DDL:

CREATE TABLE device(

`id` VARCHAR,
`type` VARCHAR,
`u_ts` BIGINT,
row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(u_ts)),
WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL '5' SECOND,
PRIMARY KEY (id, type) NOT ENFORCED

) WITH (

'connector' = 'kafka',
'format' = 'maxwell-json',
'topic' = 'device',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'latest-offset'

)

The second source is the same format. Everything is working fine until I
try any type of join. For example the Interval Join leads to:

> *pyflink.util.exceptions.TableException: IntervalJoin doesn't support
> consuming update and delete changes which is produced by node
> ChangelogNormalize(key=[id, typ])*

How can I use the final applied state of the records in a join?

3. As Tumble window does not support joins how can I keep the dynamic
tables updated but join them on a regular interval(for ex. TUMBLE_START(row_ts,
INTERVAL '1' HOUR)) and produce(INSERT) in the sink topic only the changed
records? The two source topics are from one dimensional(slowly updated) and
one fact tables where I have to be able to join outside the watermark
interval because a record from the dim table could be last updated days ago.

Thank you very much in advance!

Reply via email to