Hi
        Flink 版本 1.11.1  直接使用的cdc的包订阅bin-log

 public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("****")
                .port(***)
                .databaseList("**** ") // monitor all tables under inventory 
database
                .tableList("** ")
                .username("**")
                .password("***")
                .deserializer(new StringDebeziumDeserializationSchema()) // 
converts SourceRecord to String
                .build();

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(sourceFunction)
                .print().setParallelism(1); // use parallelism 1 for sink to 
keep message ordering
        env.execute();
    }

使用代码和flink-sql一样的效果
 
在 2020/8/26 16:25,“china_tao”<[email protected]> 写入:

    
flink什么版本?用什么方式连接的?如果是flinksql的话,使用https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html,设置driver。
    如果你mysql账号密码确定没有问题的话,可以在pom中把mysql的依赖去掉,把mysql连接的jar包房到flin的lib中,再提交一次试试。
    
    
    
    
    
    
    --
    Sent from: http://apache-flink.147419.n8.nabble.com/

回复