hi!
您好,我明白您的意思了,并且看了下网上的资料,改完后如下
DDL:
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME()
) WITH (
'connector.type' = 'kafka', -- kafka connector
'connector.version' = 'universal', -- universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = '', -- zk 地址
'connector.properties.bootstrap.servers' = '', -- broker 地址
'format.type' = 'json' -- 数据源格式为 json
);
CREATE TABLE category_info (
parent_id BIGINT, -- 商品大类
category_id BIGINT -- 商品详细类目
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://:3306/flinkdemo',
'connector.table' = 'category_info',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = '',
'connector.password' = '',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);
SQL:
SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF
U.proctime AS C
ON U.category_id = C.category_id;
但是执行SQL报错了(由于代码在办公环境粘不出来,就手打如下部分):
org.apache.flink.table.api.SqlParserExcption:Sql parse failed.Encountered
"timestamp,"at line
Was expecting one of:
"CURSOR"...
"EXISTS"...
"NOT"...
"ROW"...
"("...
一直调试不好,望指教
在2020年7月24日 14:25,Leonard Xu<[email protected]> 写道:
Hello
图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1]
祝好
Leonard Xu
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins>
在 2020年7月24日,14:14,liunaihua521 <[email protected]> 写道:
hi!
版本:flink 1.10
mysql 5.7.24
需求场景是:
使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql
connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?
现在本地测试时,维表的DDL是:
但是去mysql修改了数据后,join操作还是旧数据.
望大神们指点方向,提前谢谢了.