Hello,
这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作 真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。 而tableEnv.toRetractStream(table, Row.class).print(); 这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。 2020年7月9日15:31:56 ------------------ 原始邮件 ------------------ 发件人: "出发"<573693...@qq.com>; 发送时间: 2020年3月23日(星期一) 晚上11:30 收件人: "user-zh"<user-zh@flink.apache.org>; 主题: ddl es 报错 源码如下: CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'buy_cnt_per_hour', 'connector.document-type' = 'user_behavior', 'connector.bulk-flush.max-actions' = '1', 'format.type' = 'json', 'update-mode' = 'append' ) import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class ESTest { public static void main(String[] args) throws Exception { //2、设置运行环境 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); streamEnv.setParallelism(1); String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT " + ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6'," + "'connector.hosts' = 'http://localhost:9200','connector.index' = 'buy_cnt_per_hour'," + "'connector.document-type' = 'user_behavior'," + "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json'," + "'update-mode' = 'append' )"; tableEnv.sqlUpdate(sinkDDL); Table table = tableEnv.sqlQuery("select * from test_es "); tableEnv.toRetractStream(table, Row.class).print(); streamEnv.execute(""); } } 具体error The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json'The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append