您好:
非常感谢您的建议,我已经成功解决了这个问题,但是我又发现了一个新的问题,我这里设置的超时时间是一分钟或者超时行数是5000行,
我在这期间更新了维表数据,但是我发现已经超过了超时时间,输出结果仍然没有被更新,是我理解的有问题么?
我尝试了停止输入流数据直到达到超时时间后仍然没有更新维表,除非停止整个程序,否则我的维表数据都不会被更新。
请问这个问题有解决的办法么?
def register_mysql_source(st_env):
source_ddl = \
"""
CREATE TABLE dim_mysql (
id int, --
type varchar --
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3390/test',
'table-name' = 'flink_test',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = '****',
'password' = '****',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '1s',
'lookup.max-retries' = '3'
)
"""
st_env.sql_update(source_ddl)
感谢!
琴师
发件人: Leonard Xu
发送时间: 2020-07-22 10:54
收件人: user-zh
主题: Re: flinksql1.11中主键声明的问题
Hi,
你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。
在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。
我理解你把connector的with参数更新成新的就解决问题了。
Best
Leonard Xu
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options>
>
> def register_rides_source(st_env):
> source_ddl = \
> """
> create table source1(
> id int,
> time1 varchar ,
> type string
> ) with (
> 'connector.type' = 'kafka',
> 'connector.topic' = 'tp1',
> 'connector.startup-mode' = 'latest-offset',
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'format.type' = 'json',
> 'connector.version' = 'universal',
> 'update-mode' = 'append'
> )
> “""