[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-02-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3112


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-02-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r99837460
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,291 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
 // This instructs the sink to emit after every element, otherwise they 
would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder() {
-@Override
-public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
-Map json = new HashMap<>();
-json.put("data", element);
+List transportAddresses = new ArrayList();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction() {
+public IndexRequest createIndexRequest(String element) {
+Map json = new HashMap<>();
+json.put("data", element);
+
 return Requests.indexRequest()
 .index("my-index")
 .type("my-type")
 .source(json);
 }
+
+@Override
+public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+indexer.add(createIndexRequest(element));
+}
 }));
 {% endhighlight %}
 
-
+
+{% highlight java %}
+DataStream input = ...;
+
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
+// This instructs the sink to emit after every element, otherwise they 
would be buffered
+config.put("bulk.flush.max.actions", "1");
+
+List 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-02-07 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r99783050
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,291 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
 // This instructs the sink to emit after every element, otherwise they 
would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder() {
-@Override
-public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
-Map json = new HashMap<>();
-json.put("data", element);
+List transportAddresses = new ArrayList();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction() {
+public IndexRequest createIndexRequest(String element) {
+Map json = new HashMap<>();
+json.put("data", element);
+
 return Requests.indexRequest()
 .index("my-index")
 .type("my-type")
 .source(json);
 }
+
+@Override
+public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+indexer.add(createIndexRequest(element));
+}
 }));
 {% endhighlight %}
 
-
+
+{% highlight java %}
+DataStream input = ...;
+
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
--- End diff --

; missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-02-07 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r99782995
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,291 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
--- End diff --

There's a semikolon missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-02-07 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r99783072
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,291 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking.html)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
 // This instructs the sink to emit after every element, otherwise they 
would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder() {
-@Override
-public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
-Map json = new HashMap<>();
-json.put("data", element);
+List transportAddresses = new ArrayList();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction() {
+public IndexRequest createIndexRequest(String element) {
+Map json = new HashMap<>();
+json.put("data", element);
+
 return Requests.indexRequest()
 .index("my-index")
 .type("my-type")
 .source(json);
 }
+
+@Override
+public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+indexer.add(createIndexRequest(element));
+}
 }));
 {% endhighlight %}
 
-
+
+{% highlight java %}
+DataStream input = ...;
+
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
+// This instructs the sink to emit after every element, otherwise they 
would be buffered
+config.put("bulk.flush.max.actions", "1");
+
+List 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-28 Thread mikedias
Github user mikedias commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r98340529
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
+
+   // 

+   //  User-facing API and configuration
+   // 

+
+   /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+   private final Map userConfig;
+
+   /** The function that is used to construct mulitple {@link 
ActionRequest ActionRequests} from each incoming element. */
+   private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+   /** Provided to the user via the {@link ElasticsearchSinkFunction} to 
add {@link ActionRequest ActionRequests}. */
+   private transient BulkProcessorIndexer requestIndexer;
+
+   // 

+   //  Internals for 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-28 Thread mikedias
Github user mikedias commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r98340549
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
+
+   // 

+   //  User-facing API and configuration
+   // 

+
+   /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+   private final Map userConfig;
+
+   /** The function that is used to construct mulitple {@link 
ActionRequest ActionRequests} from each incoming element. */
+   private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+   /** Provided to the user via the {@link ElasticsearchSinkFunction} to 
add {@link ActionRequest ActionRequests}. */
+   private transient BulkProcessorIndexer requestIndexer;
+
+   // 

+   //  Internals for 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-28 Thread mikedias
Github user mikedias commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r98340492
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
+
+   // 

+   //  User-facing API and configuration
+   // 

+
+   /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+   private final Map userConfig;
+
+   /** The function that is used to construct mulitple {@link 
ActionRequest ActionRequests} from each incoming element. */
+   private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+   /** Provided to the user via the {@link ElasticsearchSinkFunction} to 
add {@link ActionRequest ActionRequests}. */
+   private transient BulkProcessorIndexer requestIndexer;
+
+   // 

+   //  Internals for 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r96274454
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
+
+   // 

+   //  User-facing API and configuration
+   // 

+
+   /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+   private final Map userConfig;
+
+   /** The function that is used to construct mulitple {@link 
ActionRequest ActionRequests} from each incoming element. */
+   private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+   /** Provided to the user via the {@link ElasticsearchSinkFunction} to 
add {@link ActionRequest ActionRequests}. */
+   private transient BulkProcessorIndexer requestIndexer;
+
+   // 

+   //  Internals for 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-16 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r96265039
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
+
+   // 

+   //  User-facing API and configuration
+   // 

+
+   /** The user specified config map that we forward to Elasticsearch when 
we create the {@link Client}. */
+   private final Map userConfig;
+
+   /** The function that is used to construct mulitple {@link 
ActionRequest ActionRequests} from each incoming element. */
+   private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+   /** Provided to the user via the {@link ElasticsearchSinkFunction} to 
add {@link ActionRequest ActionRequests}. */
+   private transient BulkProcessorIndexer requestIndexer;
+
+   // 

+   //  Internals 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95979696
  
--- Diff: .travis.yml ---
@@ -16,14 +16,14 @@ matrix:
   include:
   # Always run test groups A and B together
 - jdk: "oraclejdk8"
-  env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 
-Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis 
-Dmaven.javadoc.skip=true"
+  env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 
-Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,include-elasticsearch5 
-Dmaven.javadoc.skip=true"
 - jdk: "oraclejdk8"
-  env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 
-Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis 
-Dmaven.javadoc.skip=true"
+  env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 
-Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis,include-elasticsearch5 
-Dmaven.javadoc.skip=true"
 
 - jdk: "oraclejdk8"
-  env: PROFILE="-Dhadoop.version=2.6.3 
-Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis 
-Dmaven.javadoc.skip=true"
+  env: PROFILE="-Dhadoop.version=2.6.3 
-Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,include-elasticsearch5 
-Dmaven.javadoc.skip=true"
 - jdk: "oraclejdk8"
-  env: PROFILE="-Dhadoop.version=2.6.3 
-Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis 
-Dmaven.javadoc.skip=true"
+  env: PROFILE="-Dhadoop.version=2.6.3 
-Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis,include-elasticsearch5 
-Dmaven.javadoc.skip=true"
--- End diff --

With the automatic activation, you don't need these changes (see my other 
comment at the profile def)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95979634
  
--- Diff: flink-connectors/pom.xml ---
@@ -85,6 +86,17 @@ under the License.
flink-connector-kinesis


+
+   
+   
+   include-elasticsearch5
--- End diff --

Can you make this a profile that activates itself automatically when java 8 
is available?
http://maven.apache.org/guides/introduction/introduction-to-profiles.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95979234
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 ---
@@ -17,217 +17,51 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ElasticsearchSinkITCase extends 
StreamingMultipleProgramsTestBase {
-
-   private static final int NUM_ELEMENTS = 20;
-
-   @ClassRule
-   public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
@Test
public void testTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = NodeBuilder.nodeBuilder()
-   .settings(Settings.settingsBuilder()
-   .put("path.home", 
dataDir.getParent())
-   .put("http.enabled", false)
-   .put("path.data", 
dataDir.getAbsolutePath()))
-   // set a custom cluster name to verify that 
user config works correctly
-   .clusterName("my-transport-client-cluster")
-   .node();
-
-   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
-
-   Map config = new HashMap<>();
-   // This instructs the sink to emit after every element, 
otherwise they would be buffered
-   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
-   config.put("cluster.name", "my-transport-client-cluster");
-
-   // Can't use {@link TransportAddress} as its not Serializable 
in Elasticsearch 2.x
-   List transports = new ArrayList<>();
-   transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-   source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
-
-   env.execute("Elasticsearch TransportClient Test");
-
-   // verify the results
-   Client client = node.client();
-   for (int i = 0; i < NUM_ELEMENTS; i++) {
-   GetResponse response = client.get(new 
GetRequest("my-index",
-   "my-type", 
Integer.toString(i))).actionGet();
-   Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
-   }
-
-   node.close();
+   runTransportClientTest();
}
 
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = NodeBuilder.nodeBuilder()
-   .settings(Settings.settingsBuilder()
-   .put("path.home", dataDir.getParent())
-   .put("http.enabled", false)
-   .put("path.data", 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95979205
  
--- Diff: docs/dev/connectors/elasticsearch2.md ---
@@ -1,173 +0,0 @@

--- End diff --

Will do!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95979032
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
--- End diff --

Ah, I see. You need boxed types for nullability. Then I would actually keep 
it as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95978090
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 ---
@@ -17,217 +17,51 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ElasticsearchSinkITCase extends 
StreamingMultipleProgramsTestBase {
-
-   private static final int NUM_ELEMENTS = 20;
-
-   @ClassRule
-   public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
@Test
public void testTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = NodeBuilder.nodeBuilder()
-   .settings(Settings.settingsBuilder()
-   .put("path.home", 
dataDir.getParent())
-   .put("http.enabled", false)
-   .put("path.data", 
dataDir.getAbsolutePath()))
-   // set a custom cluster name to verify that 
user config works correctly
-   .clusterName("my-transport-client-cluster")
-   .node();
-
-   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
-
-   Map config = new HashMap<>();
-   // This instructs the sink to emit after every element, 
otherwise they would be buffered
-   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
-   config.put("cluster.name", "my-transport-client-cluster");
-
-   // Can't use {@link TransportAddress} as its not Serializable 
in Elasticsearch 2.x
-   List transports = new ArrayList<>();
-   transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-   source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
-
-   env.execute("Elasticsearch TransportClient Test");
-
-   // verify the results
-   Client client = node.client();
-   for (int i = 0; i < NUM_ELEMENTS; i++) {
-   GetResponse response = client.get(new 
GetRequest("my-index",
-   "my-type", 
Integer.toString(i))).actionGet();
-   Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
-   }
-
-   node.close();
+   runTransportClientTest();
}
 
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = NodeBuilder.nodeBuilder()
-   .settings(Settings.settingsBuilder()
-   .put("path.home", dataDir.getParent())
-   .put("http.enabled", false)
-   .put("path.data", 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95978123
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,284 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
 // This instructs the sink to emit after every element, otherwise they 
would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder() {
-@Override
-public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
-Map json = new HashMap<>();
-json.put("data", element);
+List transportAddresses = new ArrayList();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction() {
+public IndexRequest createIndexRequest(String element) {
+Map json = new HashMap<>();
+json.put("data", element);
+
 return Requests.indexRequest()
 .index("my-index")
 .type("my-type")
 .source(json);
 }
+
+@Override
+public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+indexer.add(createIndexRequest(element));
+}
 }));
 {% endhighlight %}
 
-
+
+{% highlight java %}
+DataStream input = ...;
+
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
+// This instructs the sink to emit after every element, otherwise they 
would be buffered
+config.put("bulk.flush.max.actions", "1");
+
+List transportAddresses = 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95977827
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   @Test
+   public void testTransportClient() throws Exception {
+   runTransportClientTest();
+   }
+
+   @Test
+   public void testNullTransportClient() throws Exception {
+   runNullTransportClientTest();
+   }
+
+   @Test
+   public void testEmptyTransportClient() throws Exception {
+   runEmptyTransportClientTest();
+   }
+
+   @Test
+   public void testTransportClientFails() throws Exception {
+   runTransportClientFailsTest();
+   }
+
+   @Override
+   protected  ElasticsearchSinkBase 
createElasticsearchSink(Map userConfig,
+   
List 
transportAddresses,
+   
ElasticsearchSinkFunction 
elasticsearchSinkFunction) {
--- End diff --

I'll double-check the styling in this PR nevertheless.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95977656
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   @Test
+   public void testTransportClient() throws Exception {
+   runTransportClientTest();
+   }
+
+   @Test
+   public void testNullTransportClient() throws Exception {
+   runNullTransportClientTest();
+   }
+
+   @Test
+   public void testEmptyTransportClient() throws Exception {
+   runEmptyTransportClientTest();
+   }
+
+   @Test
+   public void testTransportClientFails() throws Exception {
+   runTransportClientFailsTest();
+   }
+
+   @Override
+   protected  ElasticsearchSinkBase 
createElasticsearchSink(Map userConfig,
+   
List 
transportAddresses,
+   
ElasticsearchSinkFunction 
elasticsearchSinkFunction) {
--- End diff --

Do you mean empty line? I don't think there are trailing whitespaces.
It isn't an empty line actually, the method parameters are at the right.
Need to drag the horizontal scroll to see it, otherwise on the IDE the seem 
to be aligned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95977306
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
--- End diff --

I'll change this to `int` and use special values to represent that the user 
hasn't set a value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95968855
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -23,158 +23,284 @@ specific language governing permissions and 
limitations
 under the License.
 -->
 
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
+This connector provides sinks that can request document actions to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
+of the following dependencies to your project, depending on the version
+of the Elasticsearch installation:
+
+
+  
+
+  Maven Dependency
+  Supported since
+  Elasticsearch version
+
+  
+  
+
+flink-connector-elasticsearch{{ site.scala_version_suffix 
}}
+1.0.0
+1.x
+
+
+flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
+1.0.0
+2.x
+
+
+flink-connector-elasticsearch5{{ site.scala_version_suffix 
}}
+1.2.0
+5.x
+
+  
+
 
 Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking)
-for information about how to package the program with the libraries for
-cluster execution.
+distribution. See [here]({{site.baseurl}}/dev/linking) for information
+about how to package the program with the libraries for cluster execution.
 
  Installing Elasticsearch
 
 Instructions for setting up an Elasticsearch cluster can be found
 
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
 Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
+creating an `ElasticsearchSink` for requesting document actions against 
your cluster.
 
  Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with 
Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
 
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
+The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+Elasticsearch cluster.
 
-This code shows how to create a sink that uses an embedded Node for
-communication:
+The example below shows how to configure and create a sink:
 
 
-
+
 {% highlight java %}
 DataStream input = ...;
 
-Map config = Maps.newHashMap();
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
 // This instructs the sink to emit after every element, otherwise they 
would be buffered
 config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
 
-input.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder() {
-@Override
-public IndexRequest createIndexRequest(String element, RuntimeContext 
ctx) {
-Map json = new HashMap<>();
-json.put("data", element);
+List transportAddresses = new ArrayList();
+transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
+transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
 
+input.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction() {
+public IndexRequest createIndexRequest(String element) {
+Map json = new HashMap<>();
+json.put("data", element);
+
 return Requests.indexRequest()
 .index("my-index")
 .type("my-type")
 .source(json);
 }
+
+@Override
+public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
+indexer.add(createIndexRequest(element));
+}
 }));
 {% endhighlight %}
 
-
+
+{% highlight java %}
+DataStream input = ...;
+
+Map config = new HashMap<>();
+config.put("cluster.name", "my-cluster-name")
+// This instructs the sink to emit after every element, otherwise they 
would be buffered
+config.put("bulk.flush.max.actions", "1");
+
+List transportAddresses = 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95973065
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchClientFactoryImpl.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchClientFactory;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Concrete implementation of {@link ElasticsearchClientFactory} for 
Elasticsearch version 5.x.
+ *
+ * Flink Elasticsearch Sink for versions 5.x uses a {@link 
TransportClient} for communication with an Elasticsearch cluster.
+ */
+class ElasticsearchClientFactoryImpl implements ElasticsearchClientFactory 
{
+
+   private static final long serialVersionUID = -7185607275081428567L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchClientFactoryImpl.class);
+
+   /**
+*  User-provided transport addresses.
+*
+*  We are using {@link InetSocketAddress} because {@link 
TransportAddress} is not serializable in Elasticsearch 5.x.
--- End diff --

Tab indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95972955
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch5;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   @Test
+   public void testTransportClient() throws Exception {
+   runTransportClientTest();
+   }
+
+   @Test
+   public void testNullTransportClient() throws Exception {
+   runNullTransportClientTest();
+   }
+
+   @Test
+   public void testEmptyTransportClient() throws Exception {
+   runEmptyTransportClientTest();
+   }
+
+   @Test
+   public void testTransportClientFails() throws Exception {
+   runTransportClientFailsTest();
+   }
+
+   @Override
+   protected  ElasticsearchSinkBase 
createElasticsearchSink(Map userConfig,
+   
List 
transportAddresses,
+   
ElasticsearchSinkFunction 
elasticsearchSinkFunction) {
--- End diff --

Whitespace

(In general, this PR contains a lot of empty lines / vertical whitespace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95972691
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5/src/test/resources/log4j2.properties
 ---
@@ -0,0 +1,27 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+log4j.rootLogger=OFF, testlogger
--- End diff --

Are you sure this is a valid log4j2 example?

Log4j2 files seem to look differently: 
http://howtodoinjava.com/log4j2/log4j-2-properties-file-configuration-example/

Is the ES5 connector pulling log4j2 as a dependency? Can we avoid that?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95969031
  
--- Diff: docs/dev/connectors/elasticsearch2.md ---
@@ -1,173 +0,0 @@

--- End diff --

Can you replace the es2 page with a redirect to the new page?
This way existing links are not broken.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95974130
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 ---
@@ -17,217 +17,51 @@
  */
 package org.apache.flink.streaming.connectors.elasticsearch2;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ElasticsearchSinkITCase extends 
StreamingMultipleProgramsTestBase {
-
-   private static final int NUM_ELEMENTS = 20;
-
-   @ClassRule
-   public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
 
@Test
public void testTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = NodeBuilder.nodeBuilder()
-   .settings(Settings.settingsBuilder()
-   .put("path.home", 
dataDir.getParent())
-   .put("http.enabled", false)
-   .put("path.data", 
dataDir.getAbsolutePath()))
-   // set a custom cluster name to verify that 
user config works correctly
-   .clusterName("my-transport-client-cluster")
-   .node();
-
-   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
-
-   Map config = new HashMap<>();
-   // This instructs the sink to emit after every element, 
otherwise they would be buffered
-   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
-   config.put("cluster.name", "my-transport-client-cluster");
-
-   // Can't use {@link TransportAddress} as its not Serializable 
in Elasticsearch 2.x
-   List transports = new ArrayList<>();
-   transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-   source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
-
-   env.execute("Elasticsearch TransportClient Test");
-
-   // verify the results
-   Client client = node.client();
-   for (int i = 0; i < NUM_ELEMENTS; i++) {
-   GetResponse response = client.get(new 
GetRequest("my-index",
-   "my-type", 
Integer.toString(i))).actionGet();
-   Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
-   }
-
-   node.close();
+   runTransportClientTest();
}
 
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
-   File dataDir = tempFolder.newFolder();
-
-   Node node = NodeBuilder.nodeBuilder()
-   .settings(Settings.settingsBuilder()
-   .put("path.home", dataDir.getParent())
-   .put("http.enabled", false)
-   .put("path.data", 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95973995
  
--- Diff: flink-connectors/flink-connector-elasticsearch5/pom.xml ---
@@ -0,0 +1,121 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-connector-elasticsearch5_2.10
+   flink-connector-elasticsearch5
+
+   jar
+
+   
+   
+   5.0.0
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
flink-connector-elasticsearch-base_2.10
+   ${project.version}
+   
+   
+   org.elasticsearch
+   elasticsearch
+   
+   
+   
+
+   
+   org.elasticsearch.client
+   transport
+   ${elasticsearch.version}
+   
+
+   
+   org.apache.logging.log4j
+   log4j-api
+   2.7
+   
+   
+   org.apache.logging.log4j
+   log4j-core
+   2.7
+   
--- End diff --

How does ES5 work when executed in a Flink program. Does it write the logs 
correctly into the taskmanager.log file using log4j2?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95969123
  
--- Diff: docs/dev/connectors/elasticsearch2.md ---
@@ -1,173 +0,0 @@

-title: "Elasticsearch 2.x Connector"
-nav-title: Elasticsearch 2.x
-nav-parent_id: connectors
-nav-pos: 5

-
-
-This connector provides a Sink that can write to an
-[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add 
the
-following dependency to your project:
-
-{% highlight xml %}
-
-  org.apache.flink
-  flink-connector-elasticsearch2{{ site.scala_version_suffix 
}}
-  {{site.version }}
-
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/linking)
-for information about how to package the program with the libraries for
-cluster execution.
-
- Installing Elasticsearch 2.x
-
-Instructions for setting up an Elasticsearch cluster can be found

-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
-
- Elasticsearch 2.x Sink
-The connector provides a Sink that can send data to an Elasticsearch 2.x 
Index.
-
-The sink communicates with Elasticsearch via Transport Client
-
-See 
[here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html)
-for information about the Transport Client.
-
-The code below shows how to create a sink that uses a `TransportClient` 
for communication:
-
-
-
-{% highlight java %}
-File dataDir = ;
-
-DataStream input = ...;
-
-Map config = new HashMap<>();
-// This instructs the sink to emit after every element, otherwise they 
would be buffered
-config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
-
-List transports = new ArrayList<>();
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
9300));
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 
9300));
-
-input.addSink(new ElasticsearchSink(config, transports, new 
ElasticsearchSinkFunction() {
-  public IndexRequest createIndexRequest(String element) {
-Map json = new HashMap<>();
-json.put("data", element);
-
-return Requests.indexRequest()
-.index("my-index")
-.type("my-type")
-.source(json);
-  }
-
-  @Override
-  public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {
-indexer.add(createIndexRequest(element));
-  }
-}));
-{% endhighlight %}
-
-
-{% highlight scala %}
-val dataDir = ;
-
-val input: DataStream[String] = ...
-
-val config = new util.HashMap[String, String]
-config.put("bulk.flush.max.actions", "1")
-config.put("cluster.name", "my-cluster-name")
-
-val transports = new ArrayList[String]
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
9300))
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 
9300));
-
-input.addSink(new ElasticsearchSink(config, transports, new 
ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-val json = new util.HashMap[String, AnyRef]
-json.put("data", element)
-Requests.indexRequest.index("my-index").`type`("my-type").source(json)
-  }
-
-  override def process(element: String, ctx: RuntimeContext, indexer: 
RequestIndexer) {
-indexer.add(createIndexRequest(element))
-  }
-}))
-{% endhighlight %}
-
-
-
-A Map of Strings is used to configure the Sink. The configuration keys
-are documented in the Elasticsearch documentation

-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
-Especially important is the `cluster.name`. parameter that must correspond 
to
-the name of your cluster and with ElasticSearch 2x you also need to 
specify `path.home`.
-
-Internally, the sink uses a `BulkProcessor` to send Action requests to the 
cluster.
-This will buffer elements and Action Requests before sending to the 
cluster. The behaviour of the
-`BulkProcessor` can be configured using these config keys:
- * **bulk.flush.max.actions**: Maximum amount of elements to buffer
- * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to 
buffer
- * **bulk.flush.interval.ms**: Interval at which to flush data regardless 
of the other two
-  settings in milliseconds
-
-This 

[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95972131
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
--- End diff --

This license header seems to be different from the other files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3112#discussion_r95969879
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * 
+ * This class implements the common behaviour across Elasticsearch 
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link 
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records 
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * 
+ * The version specific behaviours for creating a {@link Client} to 
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link 
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class ElasticsearchSinkBase extends RichSinkFunction 
{
+
+   private static final long serialVersionUID = -1007596293618451942L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+   // 

+   //  Internal bulk processor configuration
+   // 

+
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+   public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+   public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+   private final Integer bulkProcessorFlushMaxActions;
+   private final Integer bulkProcessorFlushMaxSizeMb;
+   private final Integer bulkProcessorFlushIntervalMillis;
--- End diff --

Why are you using boxed types here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...

2017-01-13 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3112

[FLINK-4988] [elasticsearch] Add Elasticsearch 5.x Connector

This PR is based on @mikedias's work in #2767 (1st commit), with additional 
work to restructure the Elasticsearch connectors (2nd commit). Basically, we 
now have a `flink-connector-elasticsearch-base` containing common behaviour and 
test code.

### Deprecated Constructors for Elasticsearch 1.x / 2.x
As part of the restructuring, all connector versions now take a 
`ElasticsearchSinkFunction` (previously, 1.x took a `IndexRequestBuilder`, 
which was limited to only indexing actions on a Elasticsearch index) for full 
functional Elasticsearch support.

The `ElasticsearchSinkFunction` was also reallocated from pacakge 
`o.a.f.s.c.elasticsearch2` in `flink-connector-elasticsearch2`, to 
`o.a.f.s.c.elasticsearch` in `flink-connector-elasticsearch-base`.

This resulted in deprecation of the following:

1. In Elasticsearch 1.x: All original `IndexRequestBuilder` constructors as 
well as the interface itself have been deprecated.
2. In Elasticsearch 2.x: Due to the package relocation of 
`ElasticsearchSinkFunction`, all original constructors are also deprecated in 
favor of the new package path for the class.


R: @rmetzger @mikedias please feel free to review, thank you!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4988

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3112.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3112


commit 746cbb4dd029837d9955cd3138444f70305ac542
Author: Mike Dias 
Date:   2016-11-07T20:09:48Z

[FLINK-4988] Elasticsearch 5.x support

commit 86482962b250899e9ac076768ff98bf8fbee58f8
Author: Tzu-Li (Gordon) Tai 
Date:   2017-01-12T13:21:56Z

[FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---