????????:
CREATE TABLE buy_cnt_per_hour ( 
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'buy_cnt_per_hour',
    'connector.document-type' = 'user_behavior',
    'connector.bulk-flush.max-actions' = '1',
    'format.type' = 'json',
    'update-mode' = 'append'
)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class ESTest {

    public static void main(String[] args) throws Exception {

        //2??????????????
        StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv, settings);
        streamEnv.setParallelism(1);
        String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,  buy_cnt 
BIGINT "
                + ") WITH ( 'connector.type' = 'elasticsearch',  
'connector.version' = '6',"
                + "    'connector.hosts' = 'http://localhost:9200',  
'connector.index' = 'buy_cnt_per_hour',"
                + "    'connector.document-type' = 'user_behavior',"
                + "    'connector.bulk-flush.max-actions' = '1',\n" + "    
'format.type' = 'json',"
                + "    'update-mode' = 'append' )";
        tableEnv.sqlUpdate(sinkDDL);
        Table table = tableEnv.sqlQuery("select * from test_es ");
        tableEnv.toRetractStream(table, Row.class).print();
        streamEnv.execute("");
    }

}
????errorThe matching candidates: 
org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched 
properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 
'format.type' expects 'csv', but is 'json' The following properties are 
requested: connector.bulk-flush.max-actions=1 
connector.document-type=user_behavior connector.hosts=http://localhost:9200 
connector.index=buy_cnt_per_hour connector.type=elasticsearch 
connector.version=6 format.type=json schema.0.data-type=BIGINT 
schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt 
update-mode=append

Reply via email to