i have two questions.
1. when i add elasticserach host and port,i random write host, but not report
error. for example
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http"));
2. when i write conrrect elasticsearch host and port, but no response, also not
create index in elasticsearch
你好,我在使用flink-elasticsearch6-sink的功能,但是不能执行成功,存在以下两个问题。
1、当我随便写elasticsearch的hostname时,并不会有任何报错信息。例如
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http"));
填写的sdfsdfsf是不存在的。
2、当我使用docker启动一个elasticsearch时,程序并不会写入到elasticsearch中 ,但是也不报错。以下是代码附件。
package com.dufeng.test.connectors;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ElasticsearchSinkExample1 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
DataStream<Long> source = env.generateSequence(0, 10);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http"));
httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));
httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));
ElasticsearchSink.Builder<Long> esSinkBuilder =
new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<Long>() {
public IndexRequest createIndexRequest(String
element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(Long aLong, RuntimeContext
runtimeContext, RequestIndexer requestIndexer) {
requestIndexer.add(createIndexRequest(String.valueOf(aLong)));
}
}
);
esSinkBuilder.setBulkFlushMaxActions(1);
source.addSink(esSinkBuilder.build());
env.execute("Elasticsearch 6.x end to end sink test example");
}
}