[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895633#comment-15895633
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2767
  
I closed the PR manually using another commit.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mike Dias
> Fix For: 1.3.0
>
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895631#comment-15895631
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2767


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mike Dias
> Fix For: 1.3.0
>
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856148#comment-15856148
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2767
  
Hi @mikedias! Your contribution has been merged with 
8699b03d79a441ca33d9f62b96490d29a0efaf44 and 
b5caaef82add4a6f424094d526700c77b011724e. Could you manually close this PR? The 
bot only closed the new restructure PR.

Thanks a lot for your contribution!


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856140#comment-15856140
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3112


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856129#comment-15856129
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r99837460
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,291 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
 // This instructs the sink to emit after every element, otherwise they 
would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder() {
-@Override
-public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
-Map json = new HashMap<>();
-json.put("data", element);
+List transportAddresses = new ArrayList();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction() {
+public IndexRequest createIndexRequest(String element) {
+Map json = new HashMap<>();
+json.put("data", element);
+
 return Requests.indexRequest()
 .index("my-index")
 .type("my-type")
 .source(json);
 }
+
+@Override
+public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+indexer.add(createIndexRequest(element));
+}
 }));
 {% endhighlight %}
 
-
+
+{% highlight java %}
+DataStream input = ...;
+
+Map config 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856130#comment-15856130
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3112
  
Merging to `master` ..


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855688#comment-15855688
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3112
  
Thank you for the review! I'll address the missing semicolons and merge 
this today.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855671#comment-15855671
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3112
  
+1 to merge.

I've tried the ES2 connector (just to check one of the connectors) and it 
worked well.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855662#comment-15855662
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r99783050
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,291 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
 // This instructs the sink to emit after every element, otherwise they 
would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder() {
-@Override
-public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
-Map json = new HashMap<>();
-json.put("data", element);
+List transportAddresses = new ArrayList();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction() {
+public IndexRequest createIndexRequest(String element) {
+Map json = new HashMap<>();
+json.put("data", element);
+
 return Requests.indexRequest()
 .index("my-index")
 .type("my-type")
 .source(json);
 }
+
+@Override
+public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+indexer.add(createIndexRequest(element));
+}
 }));
 {% endhighlight %}
 
-
+
+{% highlight java %}
+DataStream input = ...;
+
+Map config 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855663#comment-15855663
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r99783072
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,291 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
 // This instructs the sink to emit after every element, otherwise they 
would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder() {
-@Override
-public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
-Map json = new HashMap<>();
-json.put("data", element);
+List transportAddresses = new ArrayList();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction() {
+public IndexRequest createIndexRequest(String element) {
+Map json = new HashMap<>();
+json.put("data", element);
+
 return Requests.indexRequest()
 .index("my-index")
 .type("my-type")
 .source(json);
 }
+
+@Override
+public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+indexer.add(createIndexRequest(element));
+}
 }));
 {% endhighlight %}
 
-
+
+{% highlight java %}
+DataStream input = ...;
+
+Map config 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855661#comment-15855661
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r99782995
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,291 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
--- End diff --

There's a semikolon missing


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15847758#comment-15847758
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user fpompermaier commented on the issue:

https://github.com/apache/flink/pull/3112
  
Hi @tzulitai, congrats for the great work! For my use case it is important 
also to be resilient to malformed documents 
(https://issues.apache.org/jira/browse/FLINK-5353). Do you think you could 
address that issue as well?
That would be awesome for us :)


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15844214#comment-15844214
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user mikedias commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r98340492
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
+
+   // 

+   //  User-facing API and configuration
+   // 

+
+   /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+   private final Map userConfig;
+
+   /** The function that is used to construct mulitple {@link 
ActionRequest ActionRequests} from each incoming element. */
+   private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+   /** Provided to the user via the {@link 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15844213#comment-15844213
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user mikedias commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r98340529
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
+
+   // 

+   //  User-facing API and configuration
+   // 

+
+   /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+   private final Map userConfig;
+
+   /** The function that is used to construct mulitple {@link 
ActionRequest ActionRequests} from each incoming element. */
+   private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+   /** Provided to the user via the {@link 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15844212#comment-15844212
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user mikedias commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r98340549
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
+
+   // 

+   //  User-facing API and configuration
+   // 

+
+   /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+   private final Map userConfig;
+
+   /** The function that is used to construct mulitple {@link 
ActionRequest ActionRequests} from each incoming element. */
+   private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+   /** Provided to the user via the {@link 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824325#comment-15824325
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r96274454
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
+
+   // 

+   //  User-facing API and configuration
+   // 

+
+   /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+   private final Map userConfig;
+
+   /** The function that is used to construct mulitple {@link 
ActionRequest ActionRequests} from each incoming element. */
+   private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+   /** Provided to the user via the {@link 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824230#comment-15824230
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r96265039
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
+
+   // 

+   //  User-facing API and configuration
+   // 

+
+   /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+   private final Map userConfig;
+
+   /** The function that is used to construct mulitple {@link 
ActionRequest ActionRequests} from each incoming element. */
+   private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+   /** Provided to the user via the {@link 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824073#comment-15824073
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3112
  
@rmetzger The comments are all addressed now, including the last two 
problems.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823726#comment-15823726
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3112
  
Gordon and I had a quick offline chat about this and decided the following:
1. We'll use the `ServiceResourceTransformer` because that's the correct 
way of solving the problem
2. There is a log4j2 to SLF4J bridge we'll try to use.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823624#comment-15823624
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3112
  
Hi @rmetzger, I've addressed your review comments. I still have some 
problems however, on the following:

1. I can't really reproduce the problem with the conflicts in 
`META-INF/services` dependencies, hence couldn't really try out whether that 
actually works correctly or if would actually use the 
`ServiceResourceTransformer` instead of `AppenderTransformer`. From the Maven 
documentation and what I understand from what the original ES2 documentation 
was trying to address, I think we can use the `ServiceResourceTransformer`. 
What do you think?

2. Regarding Log4j 2 dependency: That is required for the ES 5 Java client 
to log correctly, since the ES 5 Java API now uses Log4j 2 and does not detect 
logging implementations, must have a Log4j 2 API in the classpath. So the way I 
think the ES 5 connector is working is that the connector logs and Flink itself 
are logging using Log4j 1, while the internally used ES Java client is using 
Log4j 2 included exclusively in ES 5's POM.

I am still figuring out how to get the internal ES Java client in the ES 5 
connector to log to TaskManager logs when using cluster execution, though (the 
connector log and Flink log is correctly logged, only the ES Java client log is 
missing).

I've included this `log4j2.properties` in a test project to be packaged for 
execution:
```
appender.file.type=File
appender.file.filename=${log.file}
appender.file.name=file
appender.file.layout.type=PatternLayout
appender.file.layout.pattern=%-4r [%t] %-5p %c %x - %m%n

rootLogger.level=info
rootLogger.appenderRef.file.ref=file
```

Somehow, it isn't picking the `log.file` property, which is set by the 
`flink-daemon.sh` script as a system property. Changing `log.file` to some 
another specific file path works.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821662#comment-15821662
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95979696
  
--- Diff: .travis.yml ---
@@ -16,14 +16,14 @@ matrix:
   include:
   # Always run test groups A and B together
 - jdk: "oraclejdk8"
-  env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 
-Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis 
-Dmaven.javadoc.skip=true"
+  env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 
-Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,include-elasticsearch5 
-Dmaven.javadoc.skip=true"
 - jdk: "oraclejdk8"
-  env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 
-Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis 
-Dmaven.javadoc.skip=true"
+  env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 
-Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis,include-elasticsearch5 
-Dmaven.javadoc.skip=true"
 
 - jdk: "oraclejdk8"
-  env: PROFILE="-Dhadoop.version=2.6.3 
-Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis 
-Dmaven.javadoc.skip=true"
+  env: PROFILE="-Dhadoop.version=2.6.3 
-Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,include-elasticsearch5 
-Dmaven.javadoc.skip=true"
 - jdk: "oraclejdk8"
-  env: PROFILE="-Dhadoop.version=2.6.3 
-Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis 
-Dmaven.javadoc.skip=true"
+  env: PROFILE="-Dhadoop.version=2.6.3 
-Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis,include-elasticsearch5 
-Dmaven.javadoc.skip=true"
--- End diff --

With the automatic activation, you don't need these changes (see my other 
comment at the profile def)


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821661#comment-15821661
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95979634
  
--- Diff: flink-connectors/pom.xml ---
@@ -85,6 +86,17 @@ under the License.
flink-connector-kinesis


+
+   
+   
+   include-elasticsearch5
--- End diff --

Can you make this a profile that activates itself automatically when java 8 
is available?
http://maven.apache.org/guides/introduction/introduction-to-profiles.html


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821655#comment-15821655
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95979234
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 ---
@@ -17,217 +17,51 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ElasticsearchSinkITCase extends 
StreamingMultipleProgramsTestBase {
-
-   private static final int NUM_ELEMENTS = 20;
-
-   @ClassRule
-   public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
@Test
public void testTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = NodeBuilder.nodeBuilder()
-   .settings(Settings.settingsBuilder()
-   .put("path.home", 
dataDir.getParent())
-   .put("http.enabled", false)
-   .put("path.data", 
dataDir.getAbsolutePath()))
-   // set a custom cluster name to verify that 
user config works correctly
-   .clusterName("my-transport-client-cluster")
-   .node();
-
-   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
-
-   Map config = new HashMap<>();
-   // This instructs the sink to emit after every element, 
otherwise they would be buffered
-   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
-   config.put("cluster.name", "my-transport-client-cluster");
-
-   // Can't use {@link TransportAddress} as its not Serializable 
in Elasticsearch 2.x
-   List transports = new ArrayList<>();
-   transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-   source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
-
-   env.execute("Elasticsearch TransportClient Test");
-
-   // verify the results
-   Client client = node.client();
-   for (int i = 0; i < NUM_ELEMENTS; i++) {
-   GetResponse response = client.get(new 
GetRequest("my-index",
-   "my-type", 
Integer.toString(i))).actionGet();
-   Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
-   }
-
-   node.close();
+   runTransportClientTest();
}
 
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821654#comment-15821654
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95979205
  
--- Diff: docs/dev/connectors/elasticsearch2.md ---
@@ -1,173 +0,0 @@

--- End diff --

Will do!


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821651#comment-15821651
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95979032
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
--- End diff --

Ah, I see. You need boxed types for nullability. Then I would actually keep 
it as is.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821643#comment-15821643
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95978090
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 ---
@@ -17,217 +17,51 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ElasticsearchSinkITCase extends 
StreamingMultipleProgramsTestBase {
-
-   private static final int NUM_ELEMENTS = 20;
-
-   @ClassRule
-   public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
@Test
public void testTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = NodeBuilder.nodeBuilder()
-   .settings(Settings.settingsBuilder()
-   .put("path.home", 
dataDir.getParent())
-   .put("http.enabled", false)
-   .put("path.data", 
dataDir.getAbsolutePath()))
-   // set a custom cluster name to verify that 
user config works correctly
-   .clusterName("my-transport-client-cluster")
-   .node();
-
-   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
-
-   Map config = new HashMap<>();
-   // This instructs the sink to emit after every element, 
otherwise they would be buffered
-   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
-   config.put("cluster.name", "my-transport-client-cluster");
-
-   // Can't use {@link TransportAddress} as its not Serializable 
in Elasticsearch 2.x
-   List transports = new ArrayList<>();
-   transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-   source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
-
-   env.execute("Elasticsearch TransportClient Test");
-
-   // verify the results
-   Client client = node.client();
-   for (int i = 0; i < NUM_ELEMENTS; i++) {
-   GetResponse response = client.get(new 
GetRequest("my-index",
-   "my-type", 
Integer.toString(i))).actionGet();
-   Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
-   }
-
-   node.close();
+   runTransportClientTest();
}
 
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821644#comment-15821644
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95978123
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,284 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
 // This instructs the sink to emit after every element, otherwise they 
would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder() {
-@Override
-public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
-Map json = new HashMap<>();
-json.put("data", element);
+List transportAddresses = new ArrayList();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction() {
+public IndexRequest createIndexRequest(String element) {
+Map json = new HashMap<>();
+json.put("data", element);
+
 return Requests.indexRequest()
 .index("my-index")
 .type("my-type")
 .source(json);
 }
+
+@Override
+public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+indexer.add(createIndexRequest(element));
+}
 }));
 {% endhighlight %}
 
-
+
+{% highlight java %}
+DataStream input = ...;
+
+Map config = new 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821642#comment-15821642
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95977827
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   @Test
+   public void testTransportClient() throws Exception {
+   runTransportClientTest();
+   }
+
+   @Test
+   public void testNullTransportClient() throws Exception {
+   runNullTransportClientTest();
+   }
+
+   @Test
+   public void testEmptyTransportClient() throws Exception {
+   runEmptyTransportClientTest();
+   }
+
+   @Test
+   public void testTransportClientFails() throws Exception {
+   runTransportClientFailsTest();
+   }
+
+   @Override
+   protected  ElasticsearchSinkBase 
createElasticsearchSink(Map userConfig,
+   
List 
transportAddresses,
+   
ElasticsearchSinkFunction 
elasticsearchSinkFunction) {
--- End diff --

I'll double-check the styling in this PR nevertheless.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821640#comment-15821640
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95977656
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   @Test
+   public void testTransportClient() throws Exception {
+   runTransportClientTest();
+   }
+
+   @Test
+   public void testNullTransportClient() throws Exception {
+   runNullTransportClientTest();
+   }
+
+   @Test
+   public void testEmptyTransportClient() throws Exception {
+   runEmptyTransportClientTest();
+   }
+
+   @Test
+   public void testTransportClientFails() throws Exception {
+   runTransportClientFailsTest();
+   }
+
+   @Override
+   protected  ElasticsearchSinkBase 
createElasticsearchSink(Map userConfig,
+   
List 
transportAddresses,
+   
ElasticsearchSinkFunction 
elasticsearchSinkFunction) {
--- End diff --

Do you mean empty line? I don't think there are trailing whitespaces.
It isn't an empty line actually, the method parameters are at the right.
Need to drag the horizontal scroll to see it, otherwise on the IDE the seem 
to be aligned.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821634#comment-15821634
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95977306
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
--- End diff --

I'll change this to `int` and use special values to represent that the user 
hasn't set a value.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821605#comment-15821605
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95969123
  
--- Diff: docs/dev/connectors/elasticsearch2.md ---
@@ -1,173 +0,0 @@

-title: "Elasticsearch 2.x Connector"
-nav-title: Elasticsearch 2.x
-nav-parent_id: connectors
-nav-pos: 5

-
-
-This connector provides a Sink that can write to an
-[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add 
the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking)
-for information about how to package the program with the libraries for
-cluster execution.
-
- Installing Elasticsearch 2.x
-
-Instructions for setting up an Elasticsearch cluster can be found

-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
-
- Elasticsearch 2.x Sink
-The connector provides a Sink that can send data to an Elasticsearch 2.x 
Index.
-
-The sink communicates with Elasticsearch via Transport Client
-
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html)
-for information about the Transport Client.
-
-The code below shows how to create a sink that uses a `TransportClient` 
for communication:
-
-
-
-{% highlight java %}
-File dataDir = ;
-
-DataStream input = ...;
-
-Map config = new HashMap<>();
-// This instructs the sink to emit after every element, otherwise they 
would be buffered
-config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
-
-List transports = new ArrayList<>();
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
9300));
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 
9300));
-
-input.addSink(new ElasticsearchSink(config, transports, new 
ElasticsearchSinkFunction() {
-  public IndexRequest createIndexRequest(String element) {
-Map json = new HashMap<>();
-json.put("data", element);
-
-return Requests.indexRequest()
-.index("my-index")
-.type("my-type")
-.source(json);
-  }
-
-  @Override
-  public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
-indexer.add(createIndexRequest(element));
-  }
-}));
-{% endhighlight %}
-
-
-{% highlight scala %}
-val dataDir = ;
-
-val input: DataStream[String] = ...
-
-val config = new util.HashMap[String, String]
-config.put("bulk.flush.max.actions", "1")
-config.put("cluster.name", "my-cluster-name")
-
-val transports = new ArrayList[String]
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
9300))
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 
9300));
-
-input.addSink(new ElasticsearchSink(config, transports, new 
ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-val json = new util.HashMap[String, AnyRef]
-json.put("data", element)
-Requests.indexRequest.index("my-index").`type`("my-type").source(json)
-  }
-
-  override def process(element: String, ctx: RuntimeContext, indexer: 
RequestIndexer) {
-indexer.add(createIndexRequest(element))
-  }
-}))
-{% endhighlight %}
-
-
-
-A Map of Strings is used to configure the Sink. The configuration keys
-are documented in the Elasticsearch documentation

-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
-Especially important is the `cluster.name`. parameter that must correspond 
to
-the name of your cluster and with ElasticSearch 2x you also need to 
specify `path.home`.
-
-Internally, the sink uses a `BulkProcessor` to send Action requests to the 
cluster.
-This will buffer elements and Action Requests before sending to the 
cluster. The behaviour of the
-`BulkProcessor` can be configured using these config keys:
- * **bulk.flush.max.actions**: Maximum amount of elements to 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821607#comment-15821607
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95968855
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,284 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
 // This instructs the sink to emit after every element, otherwise they 
would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder() {
-@Override
-public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
-Map json = new HashMap<>();
-json.put("data", element);
+List transportAddresses = new ArrayList();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction() {
+public IndexRequest createIndexRequest(String element) {
+Map json = new HashMap<>();
+json.put("data", element);
+
 return Requests.indexRequest()
 .index("my-index")
 .type("my-type")
 .source(json);
 }
+
+@Override
+public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+indexer.add(createIndexRequest(element));
+}
 }));
 {% endhighlight %}
 
-
+
+{% highlight java %}
+DataStream input = ...;
+
+Map config = new 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821608#comment-15821608
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95972691
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
 ---
@@ -0,0 +1,27 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+log4j.rootLogger=OFF, testlogger
--- End diff --

Are you sure this is a valid log4j2 example?

Log4j2 files seem to look differently: 
http://howtodoinjava.com/log4j2/log4j-2-properties-file-configuration-example/

Is the ES5 connector pulling log4j2 as a dependency? Can we avoid that?



> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821602#comment-15821602
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95972131
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
--- End diff --

This license header seems to be different from the other files.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821604#comment-15821604
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95969879
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
--- End diff --

Why are you using boxed types here?


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821603#comment-15821603
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95972955
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   @Test
+   public void testTransportClient() throws Exception {
+   runTransportClientTest();
+   }
+
+   @Test
+   public void testNullTransportClient() throws Exception {
+   runNullTransportClientTest();
+   }
+
+   @Test
+   public void testEmptyTransportClient() throws Exception {
+   runEmptyTransportClientTest();
+   }
+
+   @Test
+   public void testTransportClientFails() throws Exception {
+   runTransportClientFailsTest();
+   }
+
+   @Override
+   protected  ElasticsearchSinkBase 
createElasticsearchSink(Map userConfig,
+   
List 
transportAddresses,
+   
ElasticsearchSinkFunction 
elasticsearchSinkFunction) {
--- End diff --

Whitespace

(In general, this PR contains a lot of empty lines / vertical whitespace.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821606#comment-15821606
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95974130
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 ---
@@ -17,217 +17,51 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ElasticsearchSinkITCase extends 
StreamingMultipleProgramsTestBase {
-
-   private static final int NUM_ELEMENTS = 20;
-
-   @ClassRule
-   public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
@Test
public void testTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = NodeBuilder.nodeBuilder()
-   .settings(Settings.settingsBuilder()
-   .put("path.home", 
dataDir.getParent())
-   .put("http.enabled", false)
-   .put("path.data", 
dataDir.getAbsolutePath()))
-   // set a custom cluster name to verify that 
user config works correctly
-   .clusterName("my-transport-client-cluster")
-   .node();
-
-   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
-
-   Map config = new HashMap<>();
-   // This instructs the sink to emit after every element, 
otherwise they would be buffered
-   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
-   config.put("cluster.name", "my-transport-client-cluster");
-
-   // Can't use {@link TransportAddress} as its not Serializable 
in Elasticsearch 2.x
-   List transports = new ArrayList<>();
-   transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-   source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
-
-   env.execute("Elasticsearch TransportClient Test");
-
-   // verify the results
-   Client client = node.client();
-   for (int i = 0; i < NUM_ELEMENTS; i++) {
-   GetResponse response = client.get(new 
GetRequest("my-index",
-   "my-type", 
Integer.toString(i))).actionGet();
-   Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
-   }
-
-   node.close();
+   runTransportClientTest();
}
 
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821601#comment-15821601
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95973995
  
--- Diff: flink-connectors/flink-connector-elasticsearch5/pom.xml ---
@@ -0,0 +1,121 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-connector-elasticsearch5_2.10
+   flink-connector-elasticsearch5
+
+   jar
+
+   
+   
+   5.0.0
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
flink-connector-elasticsearch-base_2.10
+   ${project.version}
+   
+   
+   org.elasticsearch
+   elasticsearch
+   
+   
+   
+
+   
+   org.elasticsearch.client
+   transport
+   ${elasticsearch.version}
+   
+
+   
+   org.apache.logging.log4j
+   log4j-api
+   2.7
+   
+   
+   org.apache.logging.log4j
+   log4j-core
+   2.7
+   
--- End diff --

How does ES5 work when executed in a Flink program. Does it write the logs 
correctly into the taskmanager.log file using log4j2?


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821609#comment-15821609
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95973065
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchClientFactoryImpl.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchClientFactory;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Concrete implementation of {@link ElasticsearchClientFactory} for 
Elasticsearch version 5.x.
+ *
+ * Flink Elasticsearch Sink for versions 5.x uses a {@link 
TransportClient} for communication with an Elasticsearch cluster.
+ */
+class ElasticsearchClientFactoryImpl implements ElasticsearchClientFactory 
{
+
+   private static final long serialVersionUID = -7185607275081428567L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchClientFactoryImpl.class);
+
+   /**
+*  User-provided transport addresses.
+*
+*  We are using {@link InetSocketAddress} because {@link 
TransportAddress} is not serializable in Elasticsearch 5.x.
--- End diff --

Tab indentation


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821600#comment-15821600
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95969031
  
--- Diff: docs/dev/connectors/elasticsearch2.md ---
@@ -1,173 +0,0 @@

--- End diff --

Can you replace the es2 page with a redirect to the new page?
This way existing links are not broken.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821574#comment-15821574
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3112
  
Thanks for the pointers. We'll include the profile `include-elasticsearch5` 
for Java 8 builds only.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821556#comment-15821556
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3112
  
Using maven build profiles, you can probably include the es5 connector in 
java8 builds only.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821553#comment-15821553
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3112
  
@rmetzger 
Ah ... seems like ES 5.x requires at least Java 8.
- 
https://www.elastic.co/guide/en/elasticsearch/reference/master/_installation.html#_installation
- https://github.com/elastic/elasticsearch/issues/17584


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821544#comment-15821544
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3112
  
Thanks a lot for opening a pull request for this.
It looks like some of the tests are failing on travis. Does ES5 support 
Java 7 ?


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821404#comment-15821404
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3112

[FLINK-4988] [elasticsearch] Add Elasticsearch 5.x Connector

This PR is based on @mikedias's work in #2767 (1st commit), with additional 
work to restructure the Elasticsearch connectors (2nd commit). Basically, we 
now have a `flink-connector-elasticsearch-base` containing common behaviour and 
test code.

### Deprecated Constructors for Elasticsearch 1.x / 2.x
As part of the restructuring, all connector versions now take a 
`ElasticsearchSinkFunction` (previously, 1.x took a `IndexRequestBuilder`, 
which was limited to only indexing actions on a Elasticsearch index) for full 
functional Elasticsearch support.

The `ElasticsearchSinkFunction` was also reallocated from pacakge 
`o.a.f.s.c.elasticsearch2` in `flink-connector-elasticsearch2`, to 
`o.a.f.s.c.elasticsearch` in `flink-connector-elasticsearch-base`.

This resulted in deprecation of the following:

1. In Elasticsearch 1.x: All original `IndexRequestBuilder` constructors as 
well as the interface itself have been deprecated.
2. In Elasticsearch 2.x: Due to the package relocation of 
`ElasticsearchSinkFunction`, all original constructors are also deprecated in 
favor of the new package path for the class.


R: @rmetzger @mikedias please feel free to review, thank you!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4988

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3112.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3112


commit 746cbb4dd029837d9955cd3138444f70305ac542
Author: Mike Dias 
Date:   2016-11-07T20:09:48Z

[FLINK-4988] Elasticsearch 5.x support

commit 86482962b250899e9ac076768ff98bf8fbee58f8
Author: Tzu-Li (Gordon) Tai 
Date:   2017-01-12T13:21:56Z

[FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors




> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15818625#comment-15818625
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2767#discussion_r95601743
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ * 
+ * 
+ * The first {@link Map} passed to the constructor is forwarded to 
Elasticsearch when creating
+ * {@link TransportClient}. The config keys can be found in the 
Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this 
should be set to the name
+ * of the cluster that the sink should emit to.
+ * 
+ * Attention:  When using the {@code TransportClient} the sink will 
fail if no cluster
+ * can be connected to.
+ * 
+ * The second {@link Map} is used to configure a {@link BulkProcessor} to 
send {@link IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * 
+ *  {@code bulk.flush.max.actions}: Maximum amount of elements to 
buffer
+ *  {@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *  {@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ * settings in milliseconds
+ * 
+ * 
+ * 
+ * You also have to provide an {@link RequestIndexer}. This is used to 
create an
+ * {@link IndexRequest} from an element that needs to be added to 
Elasticsearch. See
+ * {@link RequestIndexer} for an example.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink extends RichSinkFunction {
+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15816496#comment-15816496
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user mikedias commented on a diff in the pull request:

https://github.com/apache/flink/pull/2767#discussion_r95480042
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ * 
+ * 
+ * The first {@link Map} passed to the constructor is forwarded to 
Elasticsearch when creating
+ * {@link TransportClient}. The config keys can be found in the 
Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this 
should be set to the name
+ * of the cluster that the sink should emit to.
+ * 
+ * Attention:  When using the {@code TransportClient} the sink will 
fail if no cluster
+ * can be connected to.
+ * 
+ * The second {@link Map} is used to configure a {@link BulkProcessor} to 
send {@link IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * 
+ *  {@code bulk.flush.max.actions}: Maximum amount of elements to 
buffer
+ *  {@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *  {@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ * settings in milliseconds
+ * 
+ * 
+ * 
+ * You also have to provide an {@link RequestIndexer}. This is used to 
create an
+ * {@link IndexRequest} from an element that needs to be added to 
Elasticsearch. See
+ * {@link RequestIndexer} for an example.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink extends RichSinkFunction {
+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814970#comment-15814970
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2767
  
Sounds good. Thank you for taking care of this @tzulitai 


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813546#comment-15813546
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2767#discussion_r95287899
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ * 
+ * 
+ * The first {@link Map} passed to the constructor is forwarded to 
Elasticsearch when creating
+ * {@link TransportClient}. The config keys can be found in the 
Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this 
should be set to the name
+ * of the cluster that the sink should emit to.
+ * 
+ * Attention:  When using the {@code TransportClient} the sink will 
fail if no cluster
+ * can be connected to.
+ * 
+ * The second {@link Map} is used to configure a {@link BulkProcessor} to 
send {@link IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * 
+ *  {@code bulk.flush.max.actions}: Maximum amount of elements to 
buffer
+ *  {@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *  {@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ * settings in milliseconds
+ * 
+ * 
+ * 
+ * You also have to provide an {@link RequestIndexer}. This is used to 
create an
+ * {@link IndexRequest} from an element that needs to be added to 
Elasticsearch. See
+ * {@link RequestIndexer} for an example.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink extends RichSinkFunction {
+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812217#comment-15812217
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user mikedias commented on the issue:

https://github.com/apache/flink/pull/2767
  
@tzulitai sure, no problem! :) 


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2017-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15811558#comment-15811558
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2767
  
@mikedias @rmetzger @StephanEwen 
I've also responded to the discussion in the ML started by Robert with a +1.

A recap of the proposed approach on how we proceed:
since the ES connectors share a lot of code, we'll refactor the ES 
connectors by having a `flink-connector-elasticsearch-base` module, and let 
version specific connections stem from that; basically how we're currently 
maintaining the Kafka connectors. There are two +1 votes for this approach, and 
no other objections, so I think we can agree on proceeding.

@mikedias, are you be ok with me using your PR as a base to refactor the ES 
connectors? I'll open a new PR with your changes and mine together. I'll also 
let you know when the PR is opened :)


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761814#comment-15761814
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user mikedias commented on the issue:

https://github.com/apache/flink/pull/2767
  
I have no problem in hosting the connector in my github account.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761459#comment-15761459
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2767
  
@StephanEwen I'll start a discussion on the mailing list to decide how we 
want to proceed.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761456#comment-15761456
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2767#discussion_r93052912
  
--- Diff: flink-streaming-connectors/flink-connector-elasticsearch5/pom.xml 
---
@@ -0,0 +1,93 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.2-SNAPSHOT
+   ..
+   
+
+   flink-connector-elasticsearch5_2.10
+   flink-connector-elasticsearch5
+
+   jar
+
+   
+   
+   5.0.0
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   provided
+   
+
+   
+   org.elasticsearch.client
+   transport
+   ${elasticsearch.version}
+   
+
+   
+   org.apache.logging.log4j
+   log4j-api
+   2.7
+   
+   
+   org.apache.logging.log4j
+   log4j-core
+   2.7
--- End diff --

Why did you add log4j2 dependencies to the project?



> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761458#comment-15761458
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2767#discussion_r93053073
  
--- Diff: flink-streaming-connectors/flink-connector-elasticsearch5/pom.xml 
---
@@ -0,0 +1,93 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
--- End diff --

The parent's name has changed in the meantime (we've refactored our maven 
structure a bit for the connectors)


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761242#comment-15761242
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2767
  
@mikedias @rmetzger How do we proceed with this pull request?
Should it become part of Flink, Bahir, or should hosted in the 
contributor's repository?


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692894#comment-15692894
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2767
  
If you cannot exclude the netty dependency, you could try to shade it away: 
https://maven.apache.org/plugins/maven-shade-plugin/

We do this all the time for conflicting dependencies.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690831#comment-15690831
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user mikedias commented on the issue:

https://github.com/apache/flink/pull/2767
  
Not sure if I can exclude netty4 dependency, but I'll take a look.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690828#comment-15690828
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user mikedias commented on the issue:

https://github.com/apache/flink/pull/2767
  
No, ES is not backward compatible... But we can reuse some classes or 
interfaces between versions. I have plans to do this in another PR, just for 
isolate possible issues.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689964#comment-15689964
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2767
  
From taking a quick look at it, I would suggest that this connector shades 
netty away. That way we can avoid conflicts whenever we adjust Flink's internal 
netty version.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689960#comment-15689960
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2767
  
Is elastic search backwards compatible? Is there a way to support multiple 
Elasticsearch versions in one connector?
Would be nice not having to maintain three different versions of the 
ElasticSearch connector.

@rmetzger What is your take on this?


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645352#comment-15645352
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

GitHub user mikedias opened a pull request:

https://github.com/apache/flink/pull/2767

[FLINK-4988] Elasticsearch 5.x support

Provides Elasticsearch 5.x support based on previous 2.x codebase. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mikedias/flink FLINK-4988

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2767.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2767






> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)