????????:
CREATE TABLE buy_cnt_per_hour (
hour_of_day BIGINT,
buy_cnt BIGINT
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'buy_cnt_per_hour',
'connector.document-type' = 'user_behavior',
'connector.bulk-flush.max-actions' = '1',
'format.type' = 'json',
'update-mode' = 'append'
)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class ESTest {
public static void main(String[] args) throws Exception {
//2??????????????
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, buy_cnt
BIGINT "
+ ") WITH ( 'connector.type' = 'elasticsearch',
'connector.version' = '6',"
+ " 'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'buy_cnt_per_hour',"
+ " 'connector.document-type' = 'user_behavior',"
+ " 'connector.bulk-flush.max-actions' = '1',\n" + "
'format.type' = 'json',"
+ " 'update-mode' = 'append' )";
tableEnv.sqlUpdate(sinkDDL);
Table table = tableEnv.sqlQuery("select * from test_es ");
tableEnv.toRetractStream(table, Row.class).print();
streamEnv.execute("");
}
}
????errorThe matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched
properties: 'connector.type' expects 'filesystem', but is 'elasticsearch'
'format.type' expects 'csv', but is 'json' The following properties are
requested: connector.bulk-flush.max-actions=1
connector.document-type=user_behavior connector.hosts=http://localhost:9200
connector.index=buy_cnt_per_hour connector.type=elasticsearch
connector.version=6 format.type=json schema.0.data-type=BIGINT
schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt
update-mode=append