hi,大佬们好,我用写了段java代码,通过cdc读取mysql的数据并通过print-table打印出来,但实际没打印,代码也不报错,一直处于运行状态

*idea中运行信息如下:*
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further
details.
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver
class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered
via the SPI and manual loading of the driver class is generally unnecessary.
十二月 02, 2020 9:31:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient
connect
信息: Connected to localhost:3307 at mysql-bin.000002/2556 (sid:6668, cid:11)




*mysql相关配置:*
binlog_format   ROW
log_bin ON
binlog_row_image        FULL




*java主要代码如下*

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class mysqlSourceAndSink {
    public static void main(String[] args) {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
envSettings);


        String cdc_user_id = "create table cdc_user_id(\n" +
                "id INT \n" +
                ",pid INT \n" +
                ",PRIMARY KEY (id) NOT ENFORCED \n" +
                ") WITH (\n" +
                " 'connector' = 'mysql-cdc',\n" +
                " 'hostname' = 'localhost',\n" +
                " 'port' = '3307',\n" +
                " 'username' = 'flink',\n" +
                " 'password' = '123456',\n" +
                " 'server-id' = '6668',\n" +
//                " 'server-time-zone' = 'UTC',\n" +
                " 'server-time-zone' = 'Asia/Shanghai',\n" +
                " 'database-name' = 'flinktest',\n" +
                " 'table-name' = 'flinktest.user_id'\n" +
                ")";

        String table_print = "create table table_print( \n" +
                "id bigint,\n" +
                "pid bigint\n" +
                ") WITH(\n" +
                "'connector' = 'print',\n" +
                "'print-identifier' = 'pppp',\n" +
                "'standard-error' = 'true'\n" +
                ")";

        tableEnv.executeSql(table_print);
//        tableEnv.executeSql(jdbc_user_id);
        tableEnv.executeSql(cdc_user_id);

       String cdcUserPid2Print = "insert into table_print select id, pid
from cdc_user_id";
        tableEnv.executeSql(cdcUserPid2Print);



*
mysql的err日志中有如下打印*

2020-12-02T13:29:52.412476Z 8 [Note] Start binlog_dump to
master_thread_id(8) slave_server(6668), pos(mysql-bin.000002, 2273)
2020-12-02T13:31:03.416362Z 7 [Note] Aborted connection 7 to db:
'unconnected' user: 'flink' host: 'localhost' (Got an error reading
communication packets)
2020-12-02T13:31:36.146219Z 8 [Note] Aborted connection 8 to db:
'unconnected' user: 'flink' host: 'localhost' (failed on flush_net())
2020-12-02T13:31:45.575431Z 11 [Note] Start binlog_dump to
master_thread_id(11) slave_server(6668), pos(mysql-bin.000002, 2556)



*
账号有相关授权*
create user 'flink'@'%' identified by '123456';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'flink' IDENTIFIED BY '123456';
flush privileges;



另外我用flink的jdbc测试也是类似的,不报错,也没把数据print出来

烦请大佬帮忙看看我这是什么原因,谢谢





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复