hi,大佬们好,我用写了段java代码,通过cdc读取mysql的数据并通过print-table打印出来,但实际没打印,代码也不报错,一直处于运行状态
*idea中运行信息如下:* SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". SLF4J: Defaulting to no-operation MDCAdapter implementation. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details. Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. 十二月 02, 2020 9:31:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect 信息: Connected to localhost:3307 at mysql-bin.000002/2556 (sid:6668, cid:11) *mysql相关配置:* binlog_format ROW log_bin ON binlog_row_image FULL *java主要代码如下* import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class mysqlSourceAndSink { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings); String cdc_user_id = "create table cdc_user_id(\n" + "id INT \n" + ",pid INT \n" + ",PRIMARY KEY (id) NOT ENFORCED \n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3307',\n" + " 'username' = 'flink',\n" + " 'password' = '123456',\n" + " 'server-id' = '6668',\n" + // " 'server-time-zone' = 'UTC',\n" + " 'server-time-zone' = 'Asia/Shanghai',\n" + " 'database-name' = 'flinktest',\n" + " 'table-name' = 'flinktest.user_id'\n" + ")"; String table_print = "create table table_print( \n" + "id bigint,\n" + "pid bigint\n" + ") WITH(\n" + "'connector' = 'print',\n" + "'print-identifier' = 'pppp',\n" + "'standard-error' = 'true'\n" + ")"; tableEnv.executeSql(table_print); // tableEnv.executeSql(jdbc_user_id); tableEnv.executeSql(cdc_user_id); String cdcUserPid2Print = "insert into table_print select id, pid from cdc_user_id"; tableEnv.executeSql(cdcUserPid2Print); * mysql的err日志中有如下打印* 2020-12-02T13:29:52.412476Z 8 [Note] Start binlog_dump to master_thread_id(8) slave_server(6668), pos(mysql-bin.000002, 2273) 2020-12-02T13:31:03.416362Z 7 [Note] Aborted connection 7 to db: 'unconnected' user: 'flink' host: 'localhost' (Got an error reading communication packets) 2020-12-02T13:31:36.146219Z 8 [Note] Aborted connection 8 to db: 'unconnected' user: 'flink' host: 'localhost' (failed on flush_net()) 2020-12-02T13:31:45.575431Z 11 [Note] Start binlog_dump to master_thread_id(11) slave_server(6668), pos(mysql-bin.000002, 2556) * 账号有相关授权* create user 'flink'@'%' identified by '123456'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink' IDENTIFIED BY '123456'; flush privileges; 另外我用flink的jdbc测试也是类似的,不报错,也没把数据print出来 烦请大佬帮忙看看我这是什么原因,谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
