要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。
> 在 2022年11月3日,11:54,yinghua...@163.com 写道: > > 要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。 > > > > yinghua...@163.com > > 发件人: 左岩 > 发送时间: 2022-11-03 11:34 > 收件人: user-zh > 主题: flinkcdc 读不到mysql中数据 > 我用的是flink1.14 > ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件) > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > conf.setInteger("rest.port", 10041); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); > > // StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > > env.setParallelism(4); > > // 建表 > tenv.executeSql("CREATE TABLE flink_t_stu ( " + > " userid INT, " + > " username string, " + > " age string, " + > " `partition` INT, " + > " PRIMARY KEY(userid) NOT ENFORCED " + > " ) WITH ( " + > " 'connector' = 'mysql-cdc', " + > " 'server-id' = '5401-5404', " + > " 'hostname' = '192.168.0.220', " + > " 'port' = '3306', " + > " 'username' = 'root', " + > " 'password' = 'root', " + > " 'database-name' = 'zy', " + > " 'table-name' = 't_stu' " + > ")"); > > // 查询 > tenv.executeSql("select * from flink_t_stu").print(); > > env.execute(); > > }