hi,
请问kafka join jdbc维表数据而不是join jdbc的changelog,支持吗?
在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished,这样的话按理来说不管维表数据怎么变kafka都join不到维表数据了呀?
CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (
'connector' = 'kafka',
'topic' = 'topic_flink',
'properties.bootstrap.servers' = '10.3.12.113:9092',
'properties.group.id' = 'flink',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
CREATE TABLE latest_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' =
'jdbc:mysql://10.3.12.113:3306/base?useUnicode=true&characterEncoding=utf8&serverTimezone=PRC&useSSL=false'
'username' = 'root',
'password' = 'root1234',
'table-name' = 'latest_rates',
'lookup.cache.max-rows' = '1',
'lookup.cache.ttl' = '1min'
)
SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.currency
FROM orders AS o
LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency"
best,
amenhub