????????sink to ElasticSearch????????????????????????????ES????????????????????
????????????
package etl.estest;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import java.util.*;
public class EsTest1 {
public static void main(String[] args) throws Exception {
test2();
}
private static void test2() throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new
Properties();
properties.put("bootstrap.servers","10.67.18.100:9092");
properties.put("zookeeper.connect","10.67.18.100:2180");
properties.put("group.id","test-consumer-group");
FlinkKafkaConsumer<String> pas =
new FlinkKafkaConsumer<String>("nms.pas", new SimpleStringSchema(),
properties);
DataStreamSource<String>
pas_stream = env.addSource(pas);
pas_stream.print();
List<HttpHost> httpHosts = new
ArrayList<>();
httpHosts.add(new
HttpHost("10.67.18.100", 9310, "http"));
ElasticsearchSink.Builder<String>
esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("pas-meeting-data-2020.1.17")
.type("_doc")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer
indexer) {
indexer.add(createIndexRequest(element));
}
}
);
pas_stream.addSink(esSinkBuilder.build());
env.execute("sss");
}
}