hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的
应该使用这种语法 -- temporal join the JDBC table as a dimension tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id; 而不是 SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id 文档连接在这里, https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache 希望对你有帮助 在 2021-04-14 16:47:04,"anonnius" <[email protected]> 写道: >+1, 目前也遇到了 >在 2021-01-21 17:52:06,"刘海" <[email protected]> 写道: >>HI! >>这边做测试时遇到一个问题: >>在流应用中使用了一个mysql jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表: >>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid >>DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT >>ENFORCED\n" + >>") WITH (" + >>"'connector' = 'jdbc'," + >>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," + >>"'table-name' = 'tm_dealers'," + >>"'driver' = 'com.mysql.cj.jdbc.Driver'," + >>"'username' = 'root'," + >>"'password' = 'Cdh2020:1'," + >>"'lookup.cache.max-rows' = '500',"+ >>"'lookup.cache.ttl' = '1800s',"+ >>"'sink.buffer-flush.interval' = '60s'"+ >>")"); >> >> >>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误: >>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED >>instead. Aborting checkpoint. >> >> >>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗? >> >> >>感谢大佬指导一下,拜谢! >>| | >>刘海 >>| >>| >>[email protected] >>| >>签名由网易邮箱大师定制
