Hi, 你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。 在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。 我理解你把connector的with参数更新成新的就解决问题了。
Best Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options> > > def register_rides_source(st_env): > source_ddl = \ > """ > create table source1( > id int, > time1 varchar , > type string > ) with ( > 'connector.type' = 'kafka', > 'connector.topic' = 'tp1', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'format.type' = 'json', > 'connector.version' = 'universal', > 'update-mode' = 'append' > ) > “""
