hi
<b>Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:</b>
Connection connection =
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();
statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
" user_id BIGINT,\n" +
" item_id BIGINT,\n" +
" category_id BIGINT,\n" +
" behavior STRING,\n" +
" ts TIMESTAMP(3),\n" +
" proctime as PROCTIME(),\n" +
" WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector.type' = 'kafka', \n" +
" 'connector.version' = 'universal', \n" +
" 'connector.topic' = 'flink_im02', \n" +
" 'connector.properties.group.id' = 'flink_im02_new',\n" +
" 'connector.startup-mode' = 'earliest-offset', \n" +
" 'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
" 'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
" 'format.type' = 'csv',\n" +
" 'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
}
statement.close();
connection.close();
<b>报错:</b>
Reason: Required context properties mismatch.
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
赵峰