[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895633#comment-15895633 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2767 I closed the PR manually using another commit. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mike Dias > Fix For: 1.3.0 > > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895631#comment-15895631 ] ASF GitHub Bot commented on FLINK-4988: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2767 > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mike Dias > Fix For: 1.3.0 > > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856148#comment-15856148 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2767 Hi @mikedias! Your contribution has been merged with 8699b03d79a441ca33d9f62b96490d29a0efaf44 and b5caaef82add4a6f424094d526700c77b011724e. Could you manually close this PR? The bot only closed the new restructure PR. Thanks a lot for your contribution! > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856140#comment-15856140 ] ASF GitHub Bot commented on FLINK-4988: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3112 > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856129#comment-15856129 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r99837460 --- Diff: docs/dev/connectors/elasticsearch.md --- @@ -23,158 +23,291 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Sink that can write to an -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the -following dependency to your project: - -{% highlight xml %} - - org.apache.flink - flink-connector-elasticsearch{{ site.scala_version_suffix }} - {{site.version }} - -{% endhighlight %} +This connector provides sinks that can request document actions to an +[Elasticsearch](https://elastic.co/) Index. To use this connector, add one +of the following dependencies to your project, depending on the version +of the Elasticsearch installation: + + + + + Maven Dependency + Supported since + Elasticsearch version + + + + +flink-connector-elasticsearch{{ site.scala_version_suffix }} +1.0.0 +1.x + + +flink-connector-elasticsearch2{{ site.scala_version_suffix }} +1.0.0 +2.x + + +flink-connector-elasticsearch5{{ site.scala_version_suffix }} +1.2.0 +5.x + + + Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/dev/linking.html) -for information about how to package the program with the libraries for -cluster execution. +distribution. See [here]({{site.baseurl}}/dev/linking.html) for information +about how to package the program with the libraries for cluster execution. Installing Elasticsearch Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when -creating a Sink for writing to your cluster +creating an `ElasticsearchSink` for requesting document actions against your cluster. Elasticsearch Sink -The connector provides a Sink that can send data to an Elasticsearch Index. - -The sink can use two different methods for communicating with Elasticsearch: - -1. An embedded Node -2. The TransportClient -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) -for information about the differences between the two modes. +The `ElasticsearchSink` uses a `TransportClient` to communicate with an +Elasticsearch cluster. -This code shows how to create a sink that uses an embedded Node for -communication: +The example below shows how to configure and create a sink: - + {% highlight java %} DataStream input = ...; -Mapconfig = Maps.newHashMap(); +Map config = new HashMap<>(); +config.put("cluster.name", "my-cluster-name") // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); -config.put("cluster.name", "my-cluster-name"); -input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder() { -@Override -public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { -Map json = new HashMap<>(); -json.put("data", element); +List transportAddresses = new ArrayList(); +transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300)); +transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300)); +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction() { +public IndexRequest createIndexRequest(String element) { +Map json = new HashMap<>(); +json.put("data", element); + return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } + +@Override +public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { +indexer.add(createIndexRequest(element)); +} })); {% endhighlight %} - + +{% highlight java %} +DataStream input = ...; + +Map config
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856130#comment-15856130 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3112 Merging to `master` .. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855688#comment-15855688 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3112 Thank you for the review! I'll address the missing semicolons and merge this today. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855671#comment-15855671 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3112 +1 to merge. I've tried the ES2 connector (just to check one of the connectors) and it worked well. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855662#comment-15855662 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r99783050 --- Diff: docs/dev/connectors/elasticsearch.md --- @@ -23,158 +23,291 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Sink that can write to an -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the -following dependency to your project: - -{% highlight xml %} - - org.apache.flink - flink-connector-elasticsearch{{ site.scala_version_suffix }} - {{site.version }} - -{% endhighlight %} +This connector provides sinks that can request document actions to an +[Elasticsearch](https://elastic.co/) Index. To use this connector, add one +of the following dependencies to your project, depending on the version +of the Elasticsearch installation: + + + + + Maven Dependency + Supported since + Elasticsearch version + + + + +flink-connector-elasticsearch{{ site.scala_version_suffix }} +1.0.0 +1.x + + +flink-connector-elasticsearch2{{ site.scala_version_suffix }} +1.0.0 +2.x + + +flink-connector-elasticsearch5{{ site.scala_version_suffix }} +1.2.0 +5.x + + + Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/dev/linking.html) -for information about how to package the program with the libraries for -cluster execution. +distribution. See [here]({{site.baseurl}}/dev/linking.html) for information +about how to package the program with the libraries for cluster execution. Installing Elasticsearch Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when -creating a Sink for writing to your cluster +creating an `ElasticsearchSink` for requesting document actions against your cluster. Elasticsearch Sink -The connector provides a Sink that can send data to an Elasticsearch Index. - -The sink can use two different methods for communicating with Elasticsearch: - -1. An embedded Node -2. The TransportClient -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) -for information about the differences between the two modes. +The `ElasticsearchSink` uses a `TransportClient` to communicate with an +Elasticsearch cluster. -This code shows how to create a sink that uses an embedded Node for -communication: +The example below shows how to configure and create a sink: - + {% highlight java %} DataStream input = ...; -Mapconfig = Maps.newHashMap(); +Map config = new HashMap<>(); +config.put("cluster.name", "my-cluster-name") // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); -config.put("cluster.name", "my-cluster-name"); -input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder() { -@Override -public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { -Map json = new HashMap<>(); -json.put("data", element); +List transportAddresses = new ArrayList(); +transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300)); +transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300)); +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction() { +public IndexRequest createIndexRequest(String element) { +Map json = new HashMap<>(); +json.put("data", element); + return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } + +@Override +public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { +indexer.add(createIndexRequest(element)); +} })); {% endhighlight %} - + +{% highlight java %} +DataStream input = ...; + +Map config
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855663#comment-15855663 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r99783072 --- Diff: docs/dev/connectors/elasticsearch.md --- @@ -23,158 +23,291 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Sink that can write to an -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the -following dependency to your project: - -{% highlight xml %} - - org.apache.flink - flink-connector-elasticsearch{{ site.scala_version_suffix }} - {{site.version }} - -{% endhighlight %} +This connector provides sinks that can request document actions to an +[Elasticsearch](https://elastic.co/) Index. To use this connector, add one +of the following dependencies to your project, depending on the version +of the Elasticsearch installation: + + + + + Maven Dependency + Supported since + Elasticsearch version + + + + +flink-connector-elasticsearch{{ site.scala_version_suffix }} +1.0.0 +1.x + + +flink-connector-elasticsearch2{{ site.scala_version_suffix }} +1.0.0 +2.x + + +flink-connector-elasticsearch5{{ site.scala_version_suffix }} +1.2.0 +5.x + + + Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/dev/linking.html) -for information about how to package the program with the libraries for -cluster execution. +distribution. See [here]({{site.baseurl}}/dev/linking.html) for information +about how to package the program with the libraries for cluster execution. Installing Elasticsearch Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when -creating a Sink for writing to your cluster +creating an `ElasticsearchSink` for requesting document actions against your cluster. Elasticsearch Sink -The connector provides a Sink that can send data to an Elasticsearch Index. - -The sink can use two different methods for communicating with Elasticsearch: - -1. An embedded Node -2. The TransportClient -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) -for information about the differences between the two modes. +The `ElasticsearchSink` uses a `TransportClient` to communicate with an +Elasticsearch cluster. -This code shows how to create a sink that uses an embedded Node for -communication: +The example below shows how to configure and create a sink: - + {% highlight java %} DataStream input = ...; -Mapconfig = Maps.newHashMap(); +Map config = new HashMap<>(); +config.put("cluster.name", "my-cluster-name") // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); -config.put("cluster.name", "my-cluster-name"); -input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder() { -@Override -public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { -Map json = new HashMap<>(); -json.put("data", element); +List transportAddresses = new ArrayList(); +transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300)); +transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300)); +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction() { +public IndexRequest createIndexRequest(String element) { +Map json = new HashMap<>(); +json.put("data", element); + return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } + +@Override +public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { +indexer.add(createIndexRequest(element)); +} })); {% endhighlight %} - + +{% highlight java %} +DataStream input = ...; + +Map config
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855661#comment-15855661 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r99782995 --- Diff: docs/dev/connectors/elasticsearch.md --- @@ -23,158 +23,291 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Sink that can write to an -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the -following dependency to your project: - -{% highlight xml %} - - org.apache.flink - flink-connector-elasticsearch{{ site.scala_version_suffix }} - {{site.version }} - -{% endhighlight %} +This connector provides sinks that can request document actions to an +[Elasticsearch](https://elastic.co/) Index. To use this connector, add one +of the following dependencies to your project, depending on the version +of the Elasticsearch installation: + + + + + Maven Dependency + Supported since + Elasticsearch version + + + + +flink-connector-elasticsearch{{ site.scala_version_suffix }} +1.0.0 +1.x + + +flink-connector-elasticsearch2{{ site.scala_version_suffix }} +1.0.0 +2.x + + +flink-connector-elasticsearch5{{ site.scala_version_suffix }} +1.2.0 +5.x + + + Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/dev/linking.html) -for information about how to package the program with the libraries for -cluster execution. +distribution. See [here]({{site.baseurl}}/dev/linking.html) for information +about how to package the program with the libraries for cluster execution. Installing Elasticsearch Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when -creating a Sink for writing to your cluster +creating an `ElasticsearchSink` for requesting document actions against your cluster. Elasticsearch Sink -The connector provides a Sink that can send data to an Elasticsearch Index. - -The sink can use two different methods for communicating with Elasticsearch: - -1. An embedded Node -2. The TransportClient -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) -for information about the differences between the two modes. +The `ElasticsearchSink` uses a `TransportClient` to communicate with an +Elasticsearch cluster. -This code shows how to create a sink that uses an embedded Node for -communication: +The example below shows how to configure and create a sink: - + {% highlight java %} DataStream input = ...; -Mapconfig = Maps.newHashMap(); +Map config = new HashMap<>(); +config.put("cluster.name", "my-cluster-name") --- End diff -- There's a semikolon missing > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15847758#comment-15847758 ] ASF GitHub Bot commented on FLINK-4988: --- Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/3112 Hi @tzulitai, congrats for the great work! For my use case it is important also to be resilient to malformed documents (https://issues.apache.org/jira/browse/FLINK-5353). Do you think you could address that issue as well? That would be awesome for us :) > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15844214#comment-15844214 ] ASF GitHub Bot commented on FLINK-4988: --- Github user mikedias commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r98340492 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements handled by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; + + // + // User-facing API and configuration + // + + /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */ + private final MapuserConfig; + + /** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */ + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + /** Provided to the user via the {@link
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15844213#comment-15844213 ] ASF GitHub Bot commented on FLINK-4988: --- Github user mikedias commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r98340529 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements handled by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; + + // + // User-facing API and configuration + // + + /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */ + private final MapuserConfig; + + /** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */ + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + /** Provided to the user via the {@link
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15844212#comment-15844212 ] ASF GitHub Bot commented on FLINK-4988: --- Github user mikedias commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r98340549 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements handled by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; + + // + // User-facing API and configuration + // + + /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */ + private final MapuserConfig; + + /** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */ + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + /** Provided to the user via the {@link
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824325#comment-15824325 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r96274454 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements handled by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; + + // + // User-facing API and configuration + // + + /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */ + private final MapuserConfig; + + /** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */ + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + /** Provided to the user via the {@link
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824230#comment-15824230 ] ASF GitHub Bot commented on FLINK-4988: --- Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r96265039 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements handled by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; + + // + // User-facing API and configuration + // + + /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */ + private final MapuserConfig; + + /** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */ + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + /** Provided to the user via the {@link
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824073#comment-15824073 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3112 @rmetzger The comments are all addressed now, including the last two problems. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823726#comment-15823726 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3112 Gordon and I had a quick offline chat about this and decided the following: 1. We'll use the `ServiceResourceTransformer` because that's the correct way of solving the problem 2. There is a log4j2 to SLF4J bridge we'll try to use. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823624#comment-15823624 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3112 Hi @rmetzger, I've addressed your review comments. I still have some problems however, on the following: 1. I can't really reproduce the problem with the conflicts in `META-INF/services` dependencies, hence couldn't really try out whether that actually works correctly or if would actually use the `ServiceResourceTransformer` instead of `AppenderTransformer`. From the Maven documentation and what I understand from what the original ES2 documentation was trying to address, I think we can use the `ServiceResourceTransformer`. What do you think? 2. Regarding Log4j 2 dependency: That is required for the ES 5 Java client to log correctly, since the ES 5 Java API now uses Log4j 2 and does not detect logging implementations, must have a Log4j 2 API in the classpath. So the way I think the ES 5 connector is working is that the connector logs and Flink itself are logging using Log4j 1, while the internally used ES Java client is using Log4j 2 included exclusively in ES 5's POM. I am still figuring out how to get the internal ES Java client in the ES 5 connector to log to TaskManager logs when using cluster execution, though (the connector log and Flink log is correctly logged, only the ES Java client log is missing). I've included this `log4j2.properties` in a test project to be packaged for execution: ``` appender.file.type=File appender.file.filename=${log.file} appender.file.name=file appender.file.layout.type=PatternLayout appender.file.layout.pattern=%-4r [%t] %-5p %c %x - %m%n rootLogger.level=info rootLogger.appenderRef.file.ref=file ``` Somehow, it isn't picking the `log.file` property, which is set by the `flink-daemon.sh` script as a system property. Changing `log.file` to some another specific file path works. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821662#comment-15821662 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95979696 --- Diff: .travis.yml --- @@ -16,14 +16,14 @@ matrix: include: # Always run test groups A and B together - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" + env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,include-elasticsearch5 -Dmaven.javadoc.skip=true" - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" + env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis,include-elasticsearch5 -Dmaven.javadoc.skip=true" - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" + env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,include-elasticsearch5 -Dmaven.javadoc.skip=true" - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" + env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis,include-elasticsearch5 -Dmaven.javadoc.skip=true" --- End diff -- With the automatic activation, you don't need these changes (see my other comment at the profile def) > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821661#comment-15821661 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95979634 --- Diff: flink-connectors/pom.xml --- @@ -85,6 +86,17 @@ under the License. flink-connector-kinesis + + + + include-elasticsearch5 --- End diff -- Can you make this a profile that activates itself automatically when java 8 is available? http://maven.apache.org/guides/introduction/introduction-to-profiles.html > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821655#comment-15821655 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95979234 --- Diff: flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java --- @@ -17,217 +17,51 @@ */ package org.apache.flink.streaming.connectors.elasticsearch2; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.junit.Assert; -import org.junit.ClassRule; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { - - private static final int NUM_ELEMENTS = 20; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { @Test public void testTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource> source = env.addSource(new TestSourceFunction()); - - Map config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - // Can't use {@link TransportAddress} as its not Serializable in Elasticsearch 2.x - List transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); + runTransportClientTest(); } - @Test(expected = IllegalArgumentException.class) - public void testNullTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node =
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821654#comment-15821654 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95979205 --- Diff: docs/dev/connectors/elasticsearch2.md --- @@ -1,173 +0,0 @@ --- End diff -- Will do! > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821651#comment-15821651 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95979032 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements emitted by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; --- End diff -- Ah, I see. You need boxed types for nullability. Then I would actually keep it as is. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821643#comment-15821643 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95978090 --- Diff: flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java --- @@ -17,217 +17,51 @@ */ package org.apache.flink.streaming.connectors.elasticsearch2; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.junit.Assert; -import org.junit.ClassRule; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { - - private static final int NUM_ELEMENTS = 20; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { @Test public void testTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource> source = env.addSource(new TestSourceFunction()); - - Map config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - // Can't use {@link TransportAddress} as its not Serializable in Elasticsearch 2.x - List transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); + runTransportClientTest(); } - @Test(expected = IllegalArgumentException.class) - public void testNullTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node =
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821644#comment-15821644 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95978123 --- Diff: docs/dev/connectors/elasticsearch.md --- @@ -23,158 +23,284 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Sink that can write to an -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the -following dependency to your project: - -{% highlight xml %} - - org.apache.flink - flink-connector-elasticsearch{{ site.scala_version_suffix }} - {{site.version }} - -{% endhighlight %} +This connector provides sinks that can request document actions to an +[Elasticsearch](https://elastic.co/) Index. To use this connector, add one +of the following dependencies to your project, depending on the version +of the Elasticsearch installation: + + + + + Maven Dependency + Supported since + Elasticsearch version + + + + +flink-connector-elasticsearch{{ site.scala_version_suffix }} +1.0.0 +1.x + + +flink-connector-elasticsearch2{{ site.scala_version_suffix }} +1.0.0 +2.x + + +flink-connector-elasticsearch5{{ site.scala_version_suffix }} +1.2.0 +5.x + + + Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/dev/linking) -for information about how to package the program with the libraries for -cluster execution. +distribution. See [here]({{site.baseurl}}/dev/linking) for information +about how to package the program with the libraries for cluster execution. Installing Elasticsearch Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when -creating a Sink for writing to your cluster +creating an `ElasticsearchSink` for requesting document actions against your cluster. Elasticsearch Sink -The connector provides a Sink that can send data to an Elasticsearch Index. - -The sink can use two different methods for communicating with Elasticsearch: - -1. An embedded Node -2. The TransportClient -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) -for information about the differences between the two modes. +The `ElasticsearchSink` uses a `TransportClient` to communicate with an +Elasticsearch cluster. -This code shows how to create a sink that uses an embedded Node for -communication: +The example below shows how to configure and create a sink: - + {% highlight java %} DataStream input = ...; -Mapconfig = Maps.newHashMap(); +Map config = new HashMap<>(); +config.put("cluster.name", "my-cluster-name") // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); -config.put("cluster.name", "my-cluster-name"); -input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder() { -@Override -public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { -Map json = new HashMap<>(); -json.put("data", element); +List transportAddresses = new ArrayList(); +transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300)); +transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300)); +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction() { +public IndexRequest createIndexRequest(String element) { +Map json = new HashMap<>(); +json.put("data", element); + return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } + +@Override +public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { +indexer.add(createIndexRequest(element)); +} })); {% endhighlight %} - + +{% highlight java %} +DataStream input = ...; + +Map config = new
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821642#comment-15821642 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95977827 --- Diff: flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { + + @Test + public void testTransportClient() throws Exception { + runTransportClientTest(); + } + + @Test + public void testNullTransportClient() throws Exception { + runNullTransportClientTest(); + } + + @Test + public void testEmptyTransportClient() throws Exception { + runEmptyTransportClientTest(); + } + + @Test + public void testTransportClientFails() throws Exception { + runTransportClientFailsTest(); + } + + @Override + protected ElasticsearchSinkBase createElasticsearchSink(MapuserConfig, + List transportAddresses, + ElasticsearchSinkFunction elasticsearchSinkFunction) { --- End diff -- I'll double-check the styling in this PR nevertheless. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821640#comment-15821640 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95977656 --- Diff: flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { + + @Test + public void testTransportClient() throws Exception { + runTransportClientTest(); + } + + @Test + public void testNullTransportClient() throws Exception { + runNullTransportClientTest(); + } + + @Test + public void testEmptyTransportClient() throws Exception { + runEmptyTransportClientTest(); + } + + @Test + public void testTransportClientFails() throws Exception { + runTransportClientFailsTest(); + } + + @Override + protected ElasticsearchSinkBase createElasticsearchSink(MapuserConfig, + List transportAddresses, + ElasticsearchSinkFunction elasticsearchSinkFunction) { --- End diff -- Do you mean empty line? I don't think there are trailing whitespaces. It isn't an empty line actually, the method parameters are at the right. Need to drag the horizontal scroll to see it, otherwise on the IDE the seem to be aligned. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821634#comment-15821634 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95977306 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements emitted by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; --- End diff -- I'll change this to `int` and use special values to represent that the user hasn't set a value. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821605#comment-15821605 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95969123 --- Diff: docs/dev/connectors/elasticsearch2.md --- @@ -1,173 +0,0 @@ -title: "Elasticsearch 2.x Connector" -nav-title: Elasticsearch 2.x -nav-parent_id: connectors -nav-pos: 5 - - -This connector provides a Sink that can write to an -[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add the -following dependency to your project: - -{% highlight xml %} - - org.apache.flink - flink-connector-elasticsearch2{{ site.scala_version_suffix }} - {{site.version }} - -{% endhighlight %} - -Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/dev/linking) -for information about how to package the program with the libraries for -cluster execution. - - Installing Elasticsearch 2.x - -Instructions for setting up an Elasticsearch cluster can be found -[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). -Make sure to set and remember a cluster name. This must be set when -creating a Sink for writing to your cluster - - Elasticsearch 2.x Sink -The connector provides a Sink that can send data to an Elasticsearch 2.x Index. - -The sink communicates with Elasticsearch via Transport Client - -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html) -for information about the Transport Client. - -The code below shows how to create a sink that uses a `TransportClient` for communication: - - - -{% highlight java %} -File dataDir = ; - -DataStream input = ...; - -Mapconfig = new HashMap<>(); -// This instructs the sink to emit after every element, otherwise they would be buffered -config.put("bulk.flush.max.actions", "1"); -config.put("cluster.name", "my-cluster-name"); - -List transports = new ArrayList<>(); -transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); -transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); - -input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction() { - public IndexRequest createIndexRequest(String element) { -Map json = new HashMap<>(); -json.put("data", element); - -return Requests.indexRequest() -.index("my-index") -.type("my-type") -.source(json); - } - - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { -indexer.add(createIndexRequest(element)); - } -})); -{% endhighlight %} - - -{% highlight scala %} -val dataDir = ; - -val input: DataStream[String] = ... - -val config = new util.HashMap[String, String] -config.put("bulk.flush.max.actions", "1") -config.put("cluster.name", "my-cluster-name") - -val transports = new ArrayList[String] -transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)) -transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); - -input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction[String] { - def createIndexRequest(element: String): IndexRequest = { -val json = new util.HashMap[String, AnyRef] -json.put("data", element) -Requests.indexRequest.index("my-index").`type`("my-type").source(json) - } - - override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) { -indexer.add(createIndexRequest(element)) - } -})) -{% endhighlight %} - - - -A Map of Strings is used to configure the Sink. The configuration keys -are documented in the Elasticsearch documentation -[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html). -Especially important is the `cluster.name`. parameter that must correspond to -the name of your cluster and with ElasticSearch 2x you also need to specify `path.home`. - -Internally, the sink uses a `BulkProcessor` to send Action requests to the cluster. -This will buffer elements and Action Requests before sending to the cluster. The behaviour of the -`BulkProcessor` can be configured using these config keys: - * **bulk.flush.max.actions**: Maximum amount of elements to
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821607#comment-15821607 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95968855 --- Diff: docs/dev/connectors/elasticsearch.md --- @@ -23,158 +23,284 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Sink that can write to an -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the -following dependency to your project: - -{% highlight xml %} - - org.apache.flink - flink-connector-elasticsearch{{ site.scala_version_suffix }} - {{site.version }} - -{% endhighlight %} +This connector provides sinks that can request document actions to an +[Elasticsearch](https://elastic.co/) Index. To use this connector, add one +of the following dependencies to your project, depending on the version +of the Elasticsearch installation: + + + + + Maven Dependency + Supported since + Elasticsearch version + + + + +flink-connector-elasticsearch{{ site.scala_version_suffix }} +1.0.0 +1.x + + +flink-connector-elasticsearch2{{ site.scala_version_suffix }} +1.0.0 +2.x + + +flink-connector-elasticsearch5{{ site.scala_version_suffix }} +1.2.0 +5.x + + + Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/dev/linking) -for information about how to package the program with the libraries for -cluster execution. +distribution. See [here]({{site.baseurl}}/dev/linking) for information +about how to package the program with the libraries for cluster execution. Installing Elasticsearch Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when -creating a Sink for writing to your cluster +creating an `ElasticsearchSink` for requesting document actions against your cluster. Elasticsearch Sink -The connector provides a Sink that can send data to an Elasticsearch Index. - -The sink can use two different methods for communicating with Elasticsearch: - -1. An embedded Node -2. The TransportClient -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) -for information about the differences between the two modes. +The `ElasticsearchSink` uses a `TransportClient` to communicate with an +Elasticsearch cluster. -This code shows how to create a sink that uses an embedded Node for -communication: +The example below shows how to configure and create a sink: - + {% highlight java %} DataStream input = ...; -Mapconfig = Maps.newHashMap(); +Map config = new HashMap<>(); +config.put("cluster.name", "my-cluster-name") // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); -config.put("cluster.name", "my-cluster-name"); -input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder() { -@Override -public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { -Map json = new HashMap<>(); -json.put("data", element); +List transportAddresses = new ArrayList(); +transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300)); +transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300)); +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction() { +public IndexRequest createIndexRequest(String element) { +Map json = new HashMap<>(); +json.put("data", element); + return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } + +@Override +public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { +indexer.add(createIndexRequest(element)); +} })); {% endhighlight %} - + +{% highlight java %} +DataStream input = ...; + +Map config = new
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821608#comment-15821608 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95972691 --- Diff: flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties --- @@ -0,0 +1,27 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +log4j.rootLogger=OFF, testlogger --- End diff -- Are you sure this is a valid log4j2 example? Log4j2 files seem to look differently: http://howtodoinjava.com/log4j2/log4j-2-properties-file-configuration-example/ Is the ES5 connector pulling log4j2 as a dependency? Can we avoid that? > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821602#comment-15821602 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95972131 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. --- End diff -- This license header seems to be different from the other files. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821604#comment-15821604 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95969879 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + * + * This class implements the common behaviour across Elasticsearch versions, such as + * the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before + * sending the requests to the cluster, as well as passing input records to the user provided + * {@link ElasticsearchSinkFunction} for processing. + * + * + * The version specific behaviours for creating a {@link Client} to connect to a Elasticsearch cluster + * should be defined by concrete implementations of a {@link ElasticsearchClientFactory}, which is to be provided to the + * constructor of this class. + * + * @param Type of the elements emitted by this sink + */ +public abstract class ElasticsearchSinkBase extends RichSinkFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Integer bulkProcessorFlushIntervalMillis; --- End diff -- Why are you using boxed types here? > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821603#comment-15821603 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95972955 --- Diff: flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { + + @Test + public void testTransportClient() throws Exception { + runTransportClientTest(); + } + + @Test + public void testNullTransportClient() throws Exception { + runNullTransportClientTest(); + } + + @Test + public void testEmptyTransportClient() throws Exception { + runEmptyTransportClientTest(); + } + + @Test + public void testTransportClientFails() throws Exception { + runTransportClientFailsTest(); + } + + @Override + protected ElasticsearchSinkBase createElasticsearchSink(MapuserConfig, + List transportAddresses, + ElasticsearchSinkFunction elasticsearchSinkFunction) { --- End diff -- Whitespace (In general, this PR contains a lot of empty lines / vertical whitespace. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821606#comment-15821606 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95974130 --- Diff: flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java --- @@ -17,217 +17,51 @@ */ package org.apache.flink.streaming.connectors.elasticsearch2; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.junit.Assert; -import org.junit.ClassRule; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { - - private static final int NUM_ELEMENTS = 20; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { @Test public void testTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource> source = env.addSource(new TestSourceFunction()); - - Map config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - // Can't use {@link TransportAddress} as its not Serializable in Elasticsearch 2.x - List transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); + runTransportClientTest(); } - @Test(expected = IllegalArgumentException.class) - public void testNullTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node =
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821601#comment-15821601 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95973995 --- Diff: flink-connectors/flink-connector-elasticsearch5/pom.xml --- @@ -0,0 +1,121 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.3-SNAPSHOT + .. + + + flink-connector-elasticsearch5_2.10 + flink-connector-elasticsearch5 + + jar + + + + 5.0.0 + + + + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + provided + + + + org.apache.flink + flink-connector-elasticsearch-base_2.10 + ${project.version} + + + org.elasticsearch + elasticsearch + + + + + + org.elasticsearch.client + transport + ${elasticsearch.version} + + + + org.apache.logging.log4j + log4j-api + 2.7 + + + org.apache.logging.log4j + log4j-core + 2.7 + --- End diff -- How does ES5 work when executed in a Flink program. Does it write the logs correctly into the taskmanager.log file using log4j2? > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821609#comment-15821609 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95973065 --- Diff: flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchClientFactoryImpl.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchClientFactory; +import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils; +import org.apache.flink.util.Preconditions; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.transport.Netty3Plugin; +import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +/** + * Concrete implementation of {@link ElasticsearchClientFactory} for Elasticsearch version 5.x. + * + * Flink Elasticsearch Sink for versions 5.x uses a {@link TransportClient} for communication with an Elasticsearch cluster. + */ +class ElasticsearchClientFactoryImpl implements ElasticsearchClientFactory { + + private static final long serialVersionUID = -7185607275081428567L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchClientFactoryImpl.class); + + /** +* User-provided transport addresses. +* +* We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 5.x. --- End diff -- Tab indentation > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821600#comment-15821600 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3112#discussion_r95969031 --- Diff: docs/dev/connectors/elasticsearch2.md --- @@ -1,173 +0,0 @@ --- End diff -- Can you replace the es2 page with a redirect to the new page? This way existing links are not broken. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821574#comment-15821574 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3112 Thanks for the pointers. We'll include the profile `include-elasticsearch5` for Java 8 builds only. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821556#comment-15821556 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3112 Using maven build profiles, you can probably include the es5 connector in java8 builds only. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821553#comment-15821553 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3112 @rmetzger Ah ... seems like ES 5.x requires at least Java 8. - https://www.elastic.co/guide/en/elasticsearch/reference/master/_installation.html#_installation - https://github.com/elastic/elasticsearch/issues/17584 > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821544#comment-15821544 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3112 Thanks a lot for opening a pull request for this. It looks like some of the tests are failing on travis. Does ES5 support Java 7 ? > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821404#comment-15821404 ] ASF GitHub Bot commented on FLINK-4988: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3112 [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x Connector This PR is based on @mikedias's work in #2767 (1st commit), with additional work to restructure the Elasticsearch connectors (2nd commit). Basically, we now have a `flink-connector-elasticsearch-base` containing common behaviour and test code. ### Deprecated Constructors for Elasticsearch 1.x / 2.x As part of the restructuring, all connector versions now take a `ElasticsearchSinkFunction` (previously, 1.x took a `IndexRequestBuilder`, which was limited to only indexing actions on a Elasticsearch index) for full functional Elasticsearch support. The `ElasticsearchSinkFunction` was also reallocated from pacakge `o.a.f.s.c.elasticsearch2` in `flink-connector-elasticsearch2`, to `o.a.f.s.c.elasticsearch` in `flink-connector-elasticsearch-base`. This resulted in deprecation of the following: 1. In Elasticsearch 1.x: All original `IndexRequestBuilder` constructors as well as the interface itself have been deprecated. 2. In Elasticsearch 2.x: Due to the package relocation of `ElasticsearchSinkFunction`, all original constructors are also deprecated in favor of the new package path for the class. R: @rmetzger @mikedias please feel free to review, thank you! You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4988 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3112.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3112 commit 746cbb4dd029837d9955cd3138444f70305ac542 Author: Mike DiasDate: 2016-11-07T20:09:48Z [FLINK-4988] Elasticsearch 5.x support commit 86482962b250899e9ac076768ff98bf8fbee58f8 Author: Tzu-Li (Gordon) Tai Date: 2017-01-12T13:21:56Z [FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15818625#comment-15818625 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2767#discussion_r95601743 --- Diff: flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.transport.Netty3Plugin; +import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Sink that emits its input elements in bulk to an Elasticsearch cluster. + * + * + * The first {@link Map} passed to the constructor is forwarded to Elasticsearch when creating + * {@link TransportClient}. The config keys can be found in the Elasticsearch + * documentation. An important setting is {@code cluster.name}, this should be set to the name + * of the cluster that the sink should emit to. + * + * Attention: When using the {@code TransportClient} the sink will fail if no cluster + * can be connected to. + * + * The second {@link Map} is used to configure a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * + * {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + * {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + * {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * + * + * + * You also have to provide an {@link RequestIndexer}. This is used to create an + * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See + * {@link RequestIndexer} for an example. + * + * @param Type of the elements emitted by this sink + */ +public class ElasticsearchSink extends RichSinkFunction { + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private static final long serialVersionUID = 1L; + + private static final Logger LOG
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15816496#comment-15816496 ] ASF GitHub Bot commented on FLINK-4988: --- Github user mikedias commented on a diff in the pull request: https://github.com/apache/flink/pull/2767#discussion_r95480042 --- Diff: flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.transport.Netty3Plugin; +import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Sink that emits its input elements in bulk to an Elasticsearch cluster. + * + * + * The first {@link Map} passed to the constructor is forwarded to Elasticsearch when creating + * {@link TransportClient}. The config keys can be found in the Elasticsearch + * documentation. An important setting is {@code cluster.name}, this should be set to the name + * of the cluster that the sink should emit to. + * + * Attention: When using the {@code TransportClient} the sink will fail if no cluster + * can be connected to. + * + * The second {@link Map} is used to configure a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * + * {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + * {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + * {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * + * + * + * You also have to provide an {@link RequestIndexer}. This is used to create an + * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See + * {@link RequestIndexer} for an example. + * + * @param Type of the elements emitted by this sink + */ +public class ElasticsearchSink extends RichSinkFunction { + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private static final long serialVersionUID = 1L; + + private static final Logger LOG
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814970#comment-15814970 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2767 Sounds good. Thank you for taking care of this @tzulitai > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813546#comment-15813546 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2767#discussion_r95287899 --- Diff: flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch5; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.transport.Netty3Plugin; +import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Sink that emits its input elements in bulk to an Elasticsearch cluster. + * + * + * The first {@link Map} passed to the constructor is forwarded to Elasticsearch when creating + * {@link TransportClient}. The config keys can be found in the Elasticsearch + * documentation. An important setting is {@code cluster.name}, this should be set to the name + * of the cluster that the sink should emit to. + * + * Attention: When using the {@code TransportClient} the sink will fail if no cluster + * can be connected to. + * + * The second {@link Map} is used to configure a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * + * {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + * {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + * {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * + * + * + * You also have to provide an {@link RequestIndexer}. This is used to create an + * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See + * {@link RequestIndexer} for an example. + * + * @param Type of the elements emitted by this sink + */ +public class ElasticsearchSink extends RichSinkFunction { + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private static final long serialVersionUID = 1L; + + private static final Logger LOG
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812217#comment-15812217 ] ASF GitHub Bot commented on FLINK-4988: --- Github user mikedias commented on the issue: https://github.com/apache/flink/pull/2767 @tzulitai sure, no problem! :) > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15811558#comment-15811558 ] ASF GitHub Bot commented on FLINK-4988: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2767 @mikedias @rmetzger @StephanEwen I've also responded to the discussion in the ML started by Robert with a +1. A recap of the proposed approach on how we proceed: since the ES connectors share a lot of code, we'll refactor the ES connectors by having a `flink-connector-elasticsearch-base` module, and let version specific connections stem from that; basically how we're currently maintaining the Kafka connectors. There are two +1 votes for this approach, and no other objections, so I think we can agree on proceeding. @mikedias, are you be ok with me using your PR as a base to refactor the ES connectors? I'll open a new PR with your changes and mine together. I'll also let you know when the PR is opened :) > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761814#comment-15761814 ] ASF GitHub Bot commented on FLINK-4988: --- Github user mikedias commented on the issue: https://github.com/apache/flink/pull/2767 I have no problem in hosting the connector in my github account. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761459#comment-15761459 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2767 @StephanEwen I'll start a discussion on the mailing list to decide how we want to proceed. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761456#comment-15761456 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2767#discussion_r93052912 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch5/pom.xml --- @@ -0,0 +1,93 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.2-SNAPSHOT + .. + + + flink-connector-elasticsearch5_2.10 + flink-connector-elasticsearch5 + + jar + + + + 5.0.0 + + + + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + provided + + + + org.elasticsearch.client + transport + ${elasticsearch.version} + + + + org.apache.logging.log4j + log4j-api + 2.7 + + + org.apache.logging.log4j + log4j-core + 2.7 --- End diff -- Why did you add log4j2 dependencies to the project? > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761458#comment-15761458 ] ASF GitHub Bot commented on FLINK-4988: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2767#discussion_r93053073 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch5/pom.xml --- @@ -0,0 +1,93 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors --- End diff -- The parent's name has changed in the meantime (we've refactored our maven structure a bit for the connectors) > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761242#comment-15761242 ] ASF GitHub Bot commented on FLINK-4988: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2767 @mikedias @rmetzger How do we proceed with this pull request? Should it become part of Flink, Bahir, or should hosted in the contributor's repository? > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692894#comment-15692894 ] ASF GitHub Bot commented on FLINK-4988: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2767 If you cannot exclude the netty dependency, you could try to shade it away: https://maven.apache.org/plugins/maven-shade-plugin/ We do this all the time for conflicting dependencies. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690831#comment-15690831 ] ASF GitHub Bot commented on FLINK-4988: --- Github user mikedias commented on the issue: https://github.com/apache/flink/pull/2767 Not sure if I can exclude netty4 dependency, but I'll take a look. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690828#comment-15690828 ] ASF GitHub Bot commented on FLINK-4988: --- Github user mikedias commented on the issue: https://github.com/apache/flink/pull/2767 No, ES is not backward compatible... But we can reuse some classes or interfaces between versions. I have plans to do this in another PR, just for isolate possible issues. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689964#comment-15689964 ] ASF GitHub Bot commented on FLINK-4988: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2767 From taking a quick look at it, I would suggest that this connector shades netty away. That way we can avoid conflicts whenever we adjust Flink's internal netty version. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689960#comment-15689960 ] ASF GitHub Bot commented on FLINK-4988: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2767 Is elastic search backwards compatible? Is there a way to support multiple Elasticsearch versions in one connector? Would be nice not having to maintain three different versions of the ElasticSearch connector. @rmetzger What is your take on this? > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645352#comment-15645352 ] ASF GitHub Bot commented on FLINK-4988: --- GitHub user mikedias opened a pull request: https://github.com/apache/flink/pull/2767 [FLINK-4988] Elasticsearch 5.x support Provides Elasticsearch 5.x support based on previous 2.x codebase. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mikedias/flink FLINK-4988 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2767.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2767 > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)