你好,我在使用flink-elasticsearch6-sink的功能,但是不能执行成功,存在以下两个问题。
1、当我随便写elasticsearch的hostname时,并不会有任何报错信息。例如
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http"));
填写的sdfsdfsf是不存在的。


2、当我使用docker启动一个elasticsearch时,程序并不会写入到elasticsearch中 ,但是也不报错。以下是代码附件。

package com.dufeng.test.connectors;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ElasticsearchSinkExample1 {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);

        DataStream<Long> source =  env.generateSequence(0, 10);

        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http"));
        httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));
        httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));

        ElasticsearchSink.Builder<Long> esSinkBuilder =
                new ElasticsearchSink.Builder<>(
                        httpHosts,
                        new ElasticsearchSinkFunction<Long>() {
                            public IndexRequest createIndexRequest(String 
element) {
                                Map<String, String> json = new HashMap<>();
                                json.put("data", element);

                                return Requests.indexRequest()
                                        .index("my-index")
                                        .type("my-type")
                                        .source(json);
                            }


                            @Override
                            public void process(Long aLong, RuntimeContext 
runtimeContext, RequestIndexer requestIndexer) {
                                
requestIndexer.add(createIndexRequest(String.valueOf(aLong)));
                            }
                        }
                );

        esSinkBuilder.setBulkFlushMaxActions(1);

        source.addSink(esSinkBuilder.build());

        env.execute("Elasticsearch 6.x end to end sink test example");

    }
}

回复