Hi Madhu,

Great. Do you want to contribute it back via a GitHub pull request? If
not that's also fine. We will try look into the 2.0 connector next
week.

Best,
Max

On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <madhukar.th...@gmail.com> wrote:
> i have created working connector for Elasticsearch 2.0 based on
> elasticsearch-flink connector. I am using it right now but i want official
> connector from flink.
>
> ElasticsearchSink.java
>
>
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicReference;
>
> import org.elasticsearch.action.bulk.BulkItemResponse;
> import org.elasticsearch.action.bulk.BulkProcessor;
> import org.elasticsearch.action.bulk.BulkRequest;
> import org.elasticsearch.action.bulk.BulkResponse;
> import org.elasticsearch.action.index.IndexRequest;
> import org.elasticsearch.client.Client;
> import org.elasticsearch.client.transport.TransportClient;
> import org.elasticsearch.cluster.node.DiscoveryNode;
> import org.elasticsearch.common.settings.Settings;
> import org.elasticsearch.common.transport.InetSocketTransportAddress;
> import org.elasticsearch.common.unit.ByteSizeUnit;
> import org.elasticsearch.common.unit.ByteSizeValue;
> import org.elasticsearch.common.unit.TimeValue;
>
>
> public class ElasticsearchSink<T> extends RichSinkFunction<T> {
>
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> "bulk.flush.max.actions";
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> "bulk.flush.max.size.mb";
>     public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> "bulk.flush.interval.ms";
>
>     private static final long serialVersionUID = 1L;
>     private static final int DEFAULT_PORT = 9300;
>     private static final Logger LOG =
> LoggerFactory.getLogger(ElasticsearchSink.class);
>
>     /**
>      * The user specified config map that we forward to Elasticsearch when
> we create the Client.
>      */
>     private final Map<String, String> userConfig;
>
>     /**
>      * The builder that is used to construct an {@link IndexRequest} from
> the incoming element.
>      */
>     private final IndexRequestBuilder<T> indexRequestBuilder;
>
>     /**
>      * The Client that was either retrieved from a Node or is a
> TransportClient.
>      */
>     private transient Client client;
>
>     /**
>      * Bulk processor that was created using the client
>      */
>     private transient BulkProcessor bulkProcessor;
>
>     /**
>      * This is set from inside the BulkProcessor listener if there where
> failures in processing.
>      */
>     private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>
>     /**
>      * This is set from inside the BulkProcessor listener if a Throwable was
> thrown during processing.
>      */
>     private final AtomicReference<Throwable> failureThrowable = new
> AtomicReference<Throwable>();
>
>     public ElasticsearchSink(Map<String, String> userConfig,
> IndexRequestBuilder<T> indexRequestBuilder) {
>         this.userConfig = userConfig;
>         this.indexRequestBuilder = indexRequestBuilder;
>     }
>
>
>     @Override
>     public void open(Configuration configuration) {
>
>         ParameterTool params = ParameterTool.fromMap(userConfig);
>         Settings settings = Settings.settingsBuilder()
>                 .put(userConfig)
>                 .build();
>
>         TransportClient transportClient =
> TransportClient.builder().settings(settings).build();
>         for (String server : params.get("esHost").split(";"))
>         {
>             String[] components = server.trim().split(":");
>             String host = components[0];
>             int port = DEFAULT_PORT;
>             if (components.length > 1)
>             {
>                 port = Integer.parseInt(components[1]);
>             }
>
>             try {
>                 transportClient = transportClient.addTransportAddress(new
> InetSocketTransportAddress(InetAddress.getByName(host), port));
>             } catch (UnknownHostException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         List<DiscoveryNode> nodes = transportClient.connectedNodes();
>         if (nodes.isEmpty()) {
>             throw new RuntimeException("Client is not connected to any
> Elasticsearch nodes!");
>         } else {
>             if (LOG.isDebugEnabled()) {
>                 LOG.info("Connected to nodes: " + nodes.toString());
>             }
>         }
>         client = transportClient;
>
>         BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
>                 client,
>                 new BulkProcessor.Listener() {
>                     public void beforeBulk(long executionId,
>                                            BulkRequest request) {
>
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           BulkResponse response) {
>                         if (response.hasFailures()) {
>                             for (BulkItemResponse itemResp :
> response.getItems()) {
>                                 if (itemResp.isFailed()) {
>                                     LOG.error("Failed to index document in
> Elasticsearch: " + itemResp.getFailureMessage());
>                                     failureThrowable.compareAndSet(null, new
> RuntimeException(itemResp.getFailureMessage()));
>                                 }
>                             }
>                             hasFailure.set(true);
>                         }
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           Throwable failure) {
>                         LOG.error(failure.getMessage());
>                         failureThrowable.compareAndSet(null, failure);
>                         hasFailure.set(true);
>                     }
>                 });
>
>         // This makes flush() blocking
>         bulkProcessorBuilder.setConcurrentRequests(0);
>
>
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
>
> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
>             bulkProcessorBuilder.setBulkSize(new
> ByteSizeValue(params.getInt(
>                     CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
>
> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
>         }
>
>         bulkProcessor = bulkProcessorBuilder.build();
>     }
>
>
>     @Override
>     public void invoke(T element) {
>         IndexRequest indexRequest =
> indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
>
>         if (LOG.isDebugEnabled()) {
>             LOG.debug("Emitting IndexRequest: {}", indexRequest);
>         }
>
>         bulkProcessor.add(indexRequest);
>     }
>
>     @Override
>     public void close() {
>         if (bulkProcessor != null) {
>             bulkProcessor.close();
>             bulkProcessor = null;
>         }
>
>         if (client != null) {
>             client.close();
>         }
>
>         if (hasFailure.get()) {
>             Throwable cause = failureThrowable.get();
>             if (cause != null) {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.", cause);
>             } else {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.");
>
>             }
>         }
>     }
>
> }
>
>
> In my Main Class:
>
>
> Map<String, String> config = Maps.newHashMap();
>
> //Elasticsearch Parameters
>
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
> parameter.get("elasticsearch.bulk.flush.max.actions","1"));
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
> parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
> config.put("cluster.name", parameter.get("elasticsearch.cluster.name"));
> config.put("esHost", parameter.get("elasticsearch.server",
> "localhost:9300"));
>
>
> DataStreamSink<String> elastic = messageStream.rebalance().addSink(new
> ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element,
> runtimeContext) -> {
>     String[] line = element.toLowerCase().split("
> +(?=(?:([^\"]*\"){2})*[^\"]*$)");
>     String measureAndTags = line[0];
>     String[] kvSplit = line[1].split("=");
>     String fieldName = kvSplit[0];
>     String fieldValue = kvSplit[1];
>     Map<String, String> tags = new HashMap<>();
>     String measure = parseMeasureAndTags(measureAndTags, tags);
>     long time = (long) (Double.valueOf(line[2]) / 1000000);
>
>     Map<String, Object> test = new HashMap<>();
>     DateFormat dateFormat = new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
>
>     test.put(fieldName, setValue(fieldValue));
>     test.put("tags", tags);
>     test.put("measurement", measure);
>     test.put("@timestamp", dateFormat.format(new Date(time)));
>
>     return Requests.indexRequest()
>             .index("metrics")
>             .type("test")
>             .source(new Gson().toJson(test).toLowerCase());
>
>
> }));
>
>
> -Madhu
>
>
> On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <m...@apache.org> wrote:
>>
>> Hi Madhu,
>>
>> Not yet. The API has changed slightly. We'll add one very soon. In the
>> meantime I've created an issue to keep track of the status:
>>
>> https://issues.apache.org/jira/browse/FLINK-3115
>>
>> Thanks,
>> Max
>>
>> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
>> <madhukar.th...@gmail.com> wrote:
>> > is current elasticsearch-flink connector support elasticsearch 2.x
>> > version?
>> >
>> > -Madhu
>
>

Reply via email to