Hi
Flink 版本 1.11.1 直接使用的cdc的包订阅bin-log
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("****")
.port(***)
.databaseList("**** ") // monitor all tables under inventory
database
.tableList("** ")
.username("**")
.password("***")
.deserializer(new StringDebeziumDeserializationSchema()) //
converts SourceRecord to String
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to
keep message ordering
env.execute();
}
使用代码和flink-sql一样的效果
在 2020/8/26 16:25,“china_tao”<[email protected]> 写入:
flink什么版本?用什么方式连接的?如果是flinksql的话,使用https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html,设置driver。
如果你mysql账号密码确定没有问题的话,可以在pom中把mysql的依赖去掉,把mysql连接的jar包房到flin的lib中,再提交一次试试。
--
Sent from: http://apache-flink.147419.n8.nabble.com/