hi,

请问kafka join jdbc维表数据而不是join jdbc的changelog,支持吗?

在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished,这样的话按理来说不管维表数据怎么变kafka都join不到维表数据了呀?

CREATE TABLE orders (
    order_id STRING,
    currency STRING,
    amount INT,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic_flink',
    'properties.bootstrap.servers' = '10.3.12.113:9092',
    'properties.group.id' = 'flink',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
)

CREATE TABLE latest_rates (
    currency STRING,
    rate DECIMAL(38, 10),
    currency_time TIMESTAMP(3),
    WATERMARK FOR currency_time AS currency_time,
    PRIMARY KEY (currency) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 
'jdbc:mysql://10.3.12.113:3306/base?useUnicode=true&characterEncoding=utf8&serverTimezone=PRC&useSSL=false'
    'username' = 'root',
    'password' = 'root1234',
    'table-name' = 'latest_rates',
    'lookup.cache.max-rows' = '1',
    'lookup.cache.ttl' = '1min'
)

SELECT
    o.order_id,
    o.order_time,
    o.amount * r.rate AS amount,
    r.currency
FROM orders AS o
LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency"

best,
amenhub





回复