[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2021-04-27 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334323#comment-17334323
 ] 

Flink Jira Bot commented on FLINK-3857:
---

This issue was marked "stale-assigned" and has not received an update in 7 
days. It is now automatically unassigned. If you are still working on it, you 
can assign it to yourself again. Please also give an update about the status of 
the work.

> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2021-04-16 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323719#comment-17323719
 ] 

Flink Jira Bot commented on FLINK-3857:
---

This issue is assigned but has not received an update in 7 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056966#comment-16056966
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r123150036
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -208,6 +222,13 @@ public void invoke(T value) throws Exception {
checkErrorAndRethrow();
 
elasticsearchSinkFunction.process(value, getRuntimeContext(), 
requestIndexer);
+
+   // if there is a connectivity failure, then retry
+   if (failureThrowable.get() != null &&
--- End diff --

I'm wondering whether or not checking the exception type would be enough 
for verifying the connectivity failure.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056967#comment-16056967
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r123150100
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -234,4 +255,36 @@ private void checkErrorAndRethrow() {
throw new RuntimeException("An error occured in 
ElasticsearchSink.", cause);
}
}
+
+   private void retry(T value) throws Exception {
+   int retryCounter = 1;
+
+   while (retryCounter <= connectionRetries) {
+   if (bulkProcessor != null) {
+   bulkProcessor.close();
+   bulkProcessor = null;
+   }
+
+   if (client != null) {
+   client.close();
+   }
+
+   try {
+   open(null);
--- End diff --

This open() call seems a bit odd to me. I don't think its a good practice 
to call that here, since essentially its a life cycle method used by the system.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056969#comment-16056969
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r123149965
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -234,4 +255,36 @@ private void checkErrorAndRethrow() {
throw new RuntimeException("An error occured in 
ElasticsearchSink.", cause);
}
}
+
+   private void retry(T value) throws Exception {
+   int retryCounter = 1;
+
+   while (retryCounter <= connectionRetries) {
+   if (bulkProcessor != null) {
+   bulkProcessor.close();
+   bulkProcessor = null;
+   }
+
+   if (client != null) {
+   client.close();
+   }
+
+   try {
+   open(null);
+   elasticsearchSinkFunction.process(value, 
getRuntimeContext(), requestIndexer);
+   } catch (Exception ex) {
+   if (client instanceof TransportClient && 
!callBridge.isConnected(((TransportClient) client))) {
+   TimeUnit.SECONDS.sleep(3);
--- End diff --

Should this be configurable?
Also, could you explain a bit on why you've chosen 3 seconds?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056968#comment-16056968
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r123150004
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -208,6 +222,13 @@ public void invoke(T value) throws Exception {
checkErrorAndRethrow();
 
elasticsearchSinkFunction.process(value, getRuntimeContext(), 
requestIndexer);
+
+   // if there is a connectivity failure, then retry
+   if (failureThrowable.get() != null &&
+   client instanceof TransportClient &&
--- End diff --

Why exactly does the client need to be a `TransportClient`?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2017-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15871433#comment-15871433
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user sbcd90 commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hello @tzulitai ,

I have rebased the changes. Can you please review?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839322#comment-15839322
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Thank you for picking this up again @sbcd90. I would wait until #3112 is 
merged before rebasing.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839115#comment-15839115
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user sbcd90 commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hi @tzulitai , thanks for the updates. I'll refactor the code & will rebase 
the PR.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834045#comment-15834045
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hi @sbcd90, will you like to continue working on this PR?

There's going to be a restructuring of the ES connectors (#3112) perhaps 
soon after the 1.2 release, and this PR will very likely need a rebase. I'd 
like to include this fix after the restructuring, so please let me know on how 
you'd like to proceed with this contribution :)


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492554#comment-15492554
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Thanks @HungUnicorn, thats useful info. I wonder though if this config 
should be set by the user, instead of letting the connector internally set this.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436942#comment-15436942
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user HungUnicorn commented on the issue:

https://github.com/apache/flink/pull/1962
  
Using the sniffing feature of transport client can achieve this.
The client will connect to all existing nodes and the connected list is 
updated every 5 seconds. It can fit our case because we will only have to 
specify one ip, and we will obtain a list of ips which updated periodically. 
It's done by
`Settings settings = 
Settings.settingsBuilder().put(userConfig).put("client.transport.sniff", 
true).build();`

Explanation:
> The Transport client comes with a cluster sniffing feature which allows 
it to dynamically add new hosts and remove old ones. When sniffing is enabled 
the the transport client will connect to the nodes in its internal node list, 
which is built via calls to addTransportAddress. After this, the client will 
call the internal cluster state API on those nodes to discover available data 
nodes. The internal node list of the client will be replaced with those data 
nodes only. This list is refreshed every five seconds by default. 

Source:

https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.3//transport-client.html


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339016#comment-15339016
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r67637714
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -166,30 +265,37 @@ public void open(Configuration configuration) {
transportClient.addTransportAddress(transport);
}
 
-   // verify that we actually are connected to a cluster
+   return transportClient;
+   }
+
+   private boolean checkConnectionStatus(TransportClient transportClient) {
--- End diff --

Static would be better for utility function that will be called a lot.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339014#comment-15339014
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hi @sbcd90,
I think to address "add reconnect attempt" alone, checking whether or not 
the transport client is connected to nodes and retry connect if lost connection 
in `invoke()` before processing the element should be fine.

On the other hand, another problem that raises if we are to add reconnect 
attempt for the ES sink is that failing records due to connection errors also 
need to be caught in the `BulkProcessor` `afterBulk()` callback and 
re-processed. I wonder if we should be solving this together to resolve this 
issue. @rmetzger, what's your opinion?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339005#comment-15339005
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r67636370
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -166,30 +265,37 @@ public void open(Configuration configuration) {
transportClient.addTransportAddress(transport);
}
 
-   // verify that we actually are connected to a cluster
+   return transportClient;
+   }
+
+   private boolean checkConnectionStatus(TransportClient transportClient) {
--- End diff --

`isConnected(TransportClient)` is a more suitable name for this method. 
Also, can simply return as
`return ImmutableList.copyOf(transportClient.connectedNodes()).isEmpty()`


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339006#comment-15339006
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r67636398
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -153,9 +165,96 @@ public ElasticsearchSink(Map 
userConfig, List
 */
@Override
public void open(Configuration configuration) {
+   TransportClient transportClient = connect();
+
+   if (checkConnectionStatus(transportClient)) {
+   client = transportClient;
+   }
--- End diff --

retry in open() if not connected to any nodes?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339004#comment-15339004
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r67636257
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -166,30 +265,37 @@ public void open(Configuration configuration) {
transportClient.addTransportAddress(transport);
}
 
-   // verify that we actually are connected to a cluster
+   return transportClient;
+   }
+
+   private boolean checkConnectionStatus(TransportClient transportClient) {
+   // Check if client is connected to any Elasticsearch nodes
ImmutableList nodes = 
ImmutableList.copyOf(transportClient.connectedNodes());
if (nodes.isEmpty()) {
-   throw new RuntimeException("Client is not connected to 
any Elasticsearch nodes!");
-   }
+   LOG.error("Client is not connected to any Elasticsearch 
nodes!");
--- End diff --

Warn debug level instead of error might be more suitable? 


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339003#comment-15339003
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r67636217
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -166,30 +265,37 @@ public void open(Configuration configuration) {
transportClient.addTransportAddress(transport);
}
 
-   // verify that we actually are connected to a cluster
+   return transportClient;
+   }
+
+   private boolean checkConnectionStatus(TransportClient transportClient) {
+   // Check if client is connected to any Elasticsearch nodes
ImmutableList nodes = 
ImmutableList.copyOf(transportClient.connectedNodes());
if (nodes.isEmpty()) {
-   throw new RuntimeException("Client is not connected to 
any Elasticsearch nodes!");
-   }
+   LOG.error("Client is not connected to any Elasticsearch 
nodes!");
+   return false;
+   } else {
 
-   client = transportClient;
-
-   if (LOG.isInfoEnabled()) {
-   LOG.info("Created Elasticsearch TransportClient {}", 
client);
+   if (LOG.isInfoEnabled()) {
+   LOG.info("Created Elasticsearch TransportClient 
{}", client);
+   }
--- End diff --

Log message here doesn't match the purpose of this method


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328918#comment-15328918
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hi @sbcd90,
Sorry for the late reply, as I'm currently busy some other things. I'll be 
happy to help review again within the next 2~3 days.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323745#comment-15323745
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user sbcd90 commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hello @tzulitai ,

I think default value for int in Java is 0.
The check if connection is lost or not & then retry for connection is a 
good suggestion. Made the change.
separated the methods for connection creation & connection status check.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322006#comment-15322006
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hi @sbcd90,

Gave the changes a quick review and commented. please let me know your 
opinion on them. Hope they'll be helpful to get you going.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15321996#comment-15321996
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r66388852
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -153,9 +165,90 @@ public ElasticsearchSink(Map 
userConfig, List
 */
@Override
public void open(Configuration configuration) {
+   connect();
+
+   params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_KEY_CONNECTION_RETRIES)) {
+   this.connectionRetries = 
params.getInt(CONFIG_KEY_CONNECTION_RETRIES);
+   }
+
+   buildBulkProcessorIndexer(client);
--- End diff --

Same here. Should try to avoid side-effecting class field members within 
utility member functions.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15321994#comment-15321994
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r66388796
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -153,9 +165,90 @@ public ElasticsearchSink(Map 
userConfig, List
 */
@Override
public void open(Configuration configuration) {
+   connect();
--- End diff --

I think we should try to avoid side-effecting class field members within 
utility member functions. It reduces the readability of the code. Return the 
created TransportClient instead, and have a separate utility function to check 
if connection is lost?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15321989#comment-15321989
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r66388492
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -153,9 +165,90 @@ public ElasticsearchSink(Map 
userConfig, List
 */
@Override
public void open(Configuration configuration) {
+   connect();
+
+   params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_KEY_CONNECTION_RETRIES)) {
+   this.connectionRetries = 
params.getInt(CONFIG_KEY_CONNECTION_RETRIES);
+   }
+
+   buildBulkProcessorIndexer(client);
+   }
+
+   @Override
+   public void invoke(T element) {
+   elasticsearchSinkFunction.process(element, getRuntimeContext(), 
requestIndexer);
+
+   if (hasFailure.get()) {
--- End diff --

Another problem with this implementation is that we're capturing the 
failure of a bulk operation, but only retrying for the currently processed 
element.

Would it be simpler to check whether the Elasticsearch client is still 
connected to any nodes before `elasticsearchSinkFunction.process(element, 
getRuntimeContext(), requestIndexer);` via `client.connectedNodes().size()`? If 
not, then we retry establishing connection before processing the element.

Another approach that might be better is to let the BulkProcessor set a 
`lostConnection` flag if it gets thrown a ES connection error for a batch, and 
we simply check the flag before in invoke() before doing anything else. But 
still we will need have a way to handle all incorrectly records in that batch 
due to the lost connection.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15321935#comment-15321935
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r66385681
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -153,9 +165,90 @@ public ElasticsearchSink(Map 
userConfig, List
 */
@Override
public void open(Configuration configuration) {
+   connect();
+
+   params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_KEY_CONNECTION_RETRIES)) {
+   this.connectionRetries = 
params.getInt(CONFIG_KEY_CONNECTION_RETRIES);
+   }
+
+   buildBulkProcessorIndexer(client);
+   }
+
+   @Override
+   public void invoke(T element) {
+   elasticsearchSinkFunction.process(element, getRuntimeContext(), 
requestIndexer);
+
+   if (hasFailure.get()) {
--- End diff --

Won't there be other causes of failure besides connection error? Attempting 
to reconnect for every kind of failure doesn't seem right.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15321931#comment-15321931
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r66385603
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -153,9 +165,90 @@ public ElasticsearchSink(Map 
userConfig, List
 */
@Override
public void open(Configuration configuration) {
+   connect();
+
+   params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_KEY_CONNECTION_RETRIES)) {
+   this.connectionRetries = 
params.getInt(CONFIG_KEY_CONNECTION_RETRIES);
+   }
--- End diff --

Need to have a default value set if not specified by user? Otherwise null 
exception in invoke().


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15321852#comment-15321852
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user sbcd90 commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hello @StephanEwen ,

I have removed a timer & doing the retry logic directly now. The backoff is 
3s. Please have a look.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318993#comment-15318993
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1962
  
I took a quick look at this. I am wondering if this actually needs an extra 
timer service for retries.

Can this be solved without a timer? The failures could be detected in the 
`invoke(...)` method, and the retry done directly there (with some minimal 
backoff or so). 

Triggering asynchronous timers is very complex and easily creates leaks, 
races, or leftover work / tasks at shutdown.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15274627#comment-15274627
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user sbcd90 commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217540405
  
Hello @rmetzger ,

I added a testcase now to the `ElasticsearchSinkITCase.java` list of tests. 
Can you kindly have a look once?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15273771#comment-15273771
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217373539
  
How did you test the code you've implemented in this pull request?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15273608#comment-15273608
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user sbcd90 commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217342680
  
Hello @rmetzger ,

Looking at the test case `ElasticsearchSinkItCase.testTransportClient`, I 
think to test the re-connect scenario the `hasFailure` may need to be made 
`public` so that the test-method can set it.
Can you kindly provide some suggestions?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15273230#comment-15273230
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user sbcd90 commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217298024
  
Hello @rmetzger ,

Thanks a lot for reviewing the PR.
I have made all the changes mentioned by you as inline comments as well as 
added some documentation.
Kindly have a look now.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272205#comment-15272205
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217126041
  
The change is missing documentation updates & test cases.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272203#comment-15272203
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217125875
  
Thank you for opening the pull request.

I made some inline comments.
I don't think the proposed changes fix the issue described in the JIRA.
I would check on each `invoke()` if `hasFailure` is set. If that's the 
case, you can reconnect to EL.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272204#comment-15272204
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62174087
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_NO_OF_CONN_RETRIES) && 
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
+   final Timer timer = new Timer(true);
+   TimerTask task = new TimerTask() {
+   @Override
+   public void run() {
+   // verify that we actually are 
connected to a cluster
+   ImmutableList nodes = 
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+   if (nodes.isEmpty()) {
+   if (LOG.isInfoEnabled()) {
+   LOG.info("Connection 
Lost..Trying to reconnect to Elasticsearch nodes...");
+   }
+   open(new Configuration());
--- End diff --

I would not recommend calling the open() method from invoke(). Open() is a 
"lifecycle" method called by Flink. You should assume its only called one.
However, you can move the (re)connect logic into a separate method.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272198#comment-15272198
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62173382
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_NO_OF_CONN_RETRIES) && 
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
+   final Timer timer = new Timer(true);
+   TimerTask task = new TimerTask() {
+   @Override
+   public void run() {
+   // verify that we actually are 
connected to a cluster
+   ImmutableList nodes = 
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+   if (nodes.isEmpty()) {
+   if (LOG.isInfoEnabled()) {
+   LOG.info("Connection 
Lost..Trying to reconnect to Elasticsearch nodes...");
+   }
+   open(new Configuration());
--- End diff --

The old client is not closed.



> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272197#comment-15272197
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62173042
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_NO_OF_CONN_RETRIES) && 
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
+   final Timer timer = new Timer(true);
+   TimerTask task = new TimerTask() {
+   @Override
+   public void run() {
+   // verify that we actually are 
connected to a cluster
+   ImmutableList nodes = 
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+   if (nodes.isEmpty()) {
+   if (LOG.isInfoEnabled()) {
+   LOG.info("Connection 
Lost..Trying to reconnect to Elasticsearch nodes...");
+   }
+   open(new Configuration());
+   } else {
+   timer.cancel();
+   
intializeAndCallElasticSearchSinkFunction(element);
+   }
+   }
+   };
+
+   timer.scheduleAtFixedRate(task, 0, 3000);
+
+   try {
+   Thread.sleep(3000 * 
params.getInt(CONFIG_NO_OF_CONN_RETRIES));
+   } catch (InterruptedException e) {
+   throw new RuntimeException(e.getMessage());
+   }
+   timer.cancel();
+   // verify that we actually are connected to a cluster
+   ImmutableList nodes = 
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+   if (nodes.isEmpty()) {
+   throw new RuntimeException("Client is not 
connected to any Elasticsearch nodes!");
+   }
+   } else {
+   intializeAndCallElasticSearchSinkFunction(element);
+   }
+   }
+
+   @Override
+   public void close() {
+   if (bulkProcessor != null) {
+   bulkProcessor.close();
+   bulkProcessor = null;
+   }
+
+   if (client != null) {
+   client.close();
+   }
+
+   if (hasFailure.get()) {
+   Throwable cause = failureThrowable.get();
+   if (cause != null) {
+   throw new RuntimeException("An error occured in 
ElasticsearchSink.", cause);
+   } else {
+   throw new RuntimeException("An error occured in 
ElasticsearchSink.");
+   }
+   }
 
+   }
+
+   private void intializeAndCallElasticSearchSinkFunction(T element) {
--- End diff --

Where is the method initializing the ES sink?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272195#comment-15272195
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62172954
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_NO_OF_CONN_RETRIES) && 
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
+   final Timer timer = new Timer(true);
+   TimerTask task = new TimerTask() {
+   @Override
+   public void run() {
+   // verify that we actually are 
connected to a cluster
+   ImmutableList nodes = 
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+   if (nodes.isEmpty()) {
+   if (LOG.isInfoEnabled()) {
+   LOG.info("Connection 
Lost..Trying to reconnect to Elasticsearch nodes...");
+   }
+   open(new Configuration());
--- End diff --

The passed `element` is never send to ES in this case, right?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272192#comment-15272192
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62172911
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_NO_OF_CONN_RETRIES) && 
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
--- End diff --

This is two hashmap lookups for each element. The check can be done only 
once in the open method


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272191#comment-15272191
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62172611
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
--- End diff --

Its very inefficient to create a new `ParameterTool` instance for each 
incoming element.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272189#comment-15272189
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62172571
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -86,6 +88,7 @@
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+   public static final String CONFIG_NO_OF_CONN_RETRIES = "conn.retries";
--- End diff --

We usually don't abbreviate configuration keys. Can you rename it to 
"CONFIG_KEY_CONNECTION_RETRIES" ?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15270670#comment-15270670
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user sbcd90 commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-216874916
  
Hello @fhueske , The tests do not fail because of the changes made in the 
PR. I tested the Junits for elasticsearch connector & all of them runs fine.
Can you kindly have a look?


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15270033#comment-15270033
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

GitHub user sbcd90 opened a pull request:

https://github.com/apache/flink/pull/1962

[FLINK-3857][Streaming Connectors]Add reconnect attempt to Elasticsearch 
host

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-3857] Add 
reconnect attempt to Elasticsearch host")

- [ ] Documentation
  - Documentation added based on the changes made.

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sbcd90/flink elasticSearchRetryIssue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1962.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1962


commit 62990c984f0d2eca3ba89ed9c2d22c469f16b136
Author: Subhobrata Dey 
Date:   2016-05-04T02:16:35Z

[FLINK-3857][Streaming Connectors]Add reconnect attempt to Elasticsearch 
host




> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-05-03 Thread Subhobrata Dey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15269345#comment-15269345
 ] 

Subhobrata Dey commented on FLINK-3857:
---

Hello [~fhueske],

I'm interested in the task & assigning it to myself.

> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)