Re: 回复: Flink JDBC Driver是否支持创建流数据表
不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中 参考下这个文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector 下面的语法应该是不支持的: 'format.type' = 'csv',\n" + "'format.field-delimiter' = '|'\n" 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} tEnv.sqlUpdate("CREATE TABLE pick_order (\n" + "order_no VARCHAR,\n" + "status INT\n" + ") WITH (\n" + "'connector.type' = 'kafka',\n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'wanglei_test',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'connector.properties.0.key' = 'zookeeper.connect',\n" + "'connector.properties.0.value' = 'xxx:2181',\n" + "'connector.properties.1.key' = 'bootstrap.servers',\n" + "'connector.properties.1.value' = 'xxx:9092',\n" + "'update-mode' = 'append',\n" + "'format.type' = 'json',\n" + "'format.derive-schema' = 'true'\n" + ")"); 王磊 wangl...@geekplus.com.cn 发件人: 赵峰 发送时间: 2020-03-24 21:28 收件人: user-zh 主题: Flink JDBC Driver是否支持创建流数据表 hi Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); Statement statement = connection.createStatement(); statement.executeUpdate( "CREATE TABLE table_kafka (\n" + "user_id BIGINT,\n" + "item_id BIGINT,\n" + "category_id BIGINT,\n" + "behavior STRING,\n" + "ts TIMESTAMP(3),\n" + "proctime as PROCTIME(),\n" + "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal', \n" + "'connector.topic' = 'flink_im02', \n" + "'connector.properties.group.id' = 'flink_im02_new',\n" + "'connector.startup-mode' = 'earliest-offset', \n" + "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + "'format.type' = 'csv',\n" + "'format.field-delimiter' = '|'\n" + ")"); ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); while (rs1.next()) { System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); } statement.close(); connection.close(); 报错: Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 赵峰 Quoted from: http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html 赵峰
Flink JDBC Driver是否支持创建流数据表
hi Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); Statement statement = connection.createStatement(); statement.executeUpdate( "CREATE TABLE table_kafka (\n" + "user_id BIGINT,\n" + "item_id BIGINT,\n" + "category_id BIGINT,\n" + "behavior STRING,\n" + "ts TIMESTAMP(3),\n" + "proctime as PROCTIME(),\n" + "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal', \n" + "'connector.topic' = 'flink_im02', \n" + "'connector.properties.group.id' = 'flink_im02_new',\n" + "'connector.startup-mode' = 'earliest-offset', \n" + "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + "'format.type' = 'csv',\n" + "'format.field-delimiter' = '|'\n" + ")"); ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); while (rs1.next()) { System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); } statement.close(); connection.close(); 报错: Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 赵峰
Flink JDBC Driver可以创建kafka表吗?
hi Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); Statement statement = connection.createStatement(); statement.executeUpdate( "CREATE TABLE table_kafka (\n" + "user_id BIGINT,\n" + "item_id BIGINT,\n" + "category_id BIGINT,\n" + "behavior STRING,\n" + "ts TIMESTAMP(3),\n" + "proctime as PROCTIME(),\n" + "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal', \n" + "'connector.topic' = 'flink_im02', \n" + "'connector.properties.group.id' = 'flink_im02_new',\n" + "'connector.startup-mode' = 'earliest-offset', \n" + "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + "'format.type' = 'csv',\n" + "'format.field-delimiter' = '|'\n" + ")"); ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); while (rs1.next()) { System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); } statement.close(); connection.close(); 报错: Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 赵峰