Hello,

这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作
真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。
而tableEnv.toRetractStream(table, Row.class).print(); 
这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。


2020年7月9日15:31:56 




------------------ 原始邮件 ------------------
发件人:&nbsp;"出发"<573693...@qq.com&gt;;
发送时间:&nbsp;2020年3月23日(星期一) 晚上11:30
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;ddl es 报错



源码如下:
CREATE TABLE buy_cnt_per_hour ( 
 &nbsp; hour_of_day BIGINT,
 &nbsp; buy_cnt BIGINT
) WITH (
 &nbsp; 'connector.type' = 'elasticsearch',
 &nbsp; 'connector.version' = '6',
 &nbsp; 'connector.hosts' = 'http://localhost:9200',
 &nbsp; 'connector.index' = 'buy_cnt_per_hour',
 &nbsp; 'connector.document-type' = 'user_behavior',
 &nbsp; 'connector.bulk-flush.max-actions' = '1',
 &nbsp; 'format.type' = 'json',
 &nbsp; 'update-mode' = 'append'
)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2、设置运行环境
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
settings);
streamEnv.setParallelism(1);
String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT "
+ ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6',"
+ "'connector.hosts' = 'http://localhost:9200','connector.index' = 
'buy_cnt_per_hour',"
+ "'connector.document-type' = 'user_behavior',"
+ "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json',"
+ "'update-mode' = 'append' )";
tableEnv.sqlUpdate(sinkDDL);
Table table = tableEnv.sqlQuery("select * from test_es ");
tableEnv.toRetractStream(table, Row.class).print();
streamEnv.execute("");
}

}

具体error
The matching candidates: 
org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched 
properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 
'format.type' expects 'csv', but is 'json'The following properties are 
requested: connector.bulk-flush.max-actions=1 
connector.document-type=user_behavior connector.hosts=http://localhost:9200 
connector.index=buy_cnt_per_hour connector.type=elasticsearch 
connector.version=6 format.type=json schema.0.data-type=BIGINT 
schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt 
update-mode=append

回复