[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-06 Thread sbcd90
Github user sbcd90 commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217540405
  
Hello @rmetzger ,

I added a testcase now to the `ElasticsearchSinkITCase.java` list of tests. 
Can you kindly have a look once?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-06 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217373539
  
How did you test the code you've implemented in this pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread sbcd90
Github user sbcd90 commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217342680
  
Hello @rmetzger ,

Looking at the test case `ElasticsearchSinkItCase.testTransportClient`, I 
think to test the re-connect scenario the `hasFailure` may need to be made 
`public` so that the test-method can set it.
Can you kindly provide some suggestions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread sbcd90
Github user sbcd90 commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217298024
  
Hello @rmetzger ,

Thanks a lot for reviewing the PR.
I have made all the changes mentioned by you as inline comments as well as 
added some documentation.
Kindly have a look now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62174087
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_NO_OF_CONN_RETRIES) && 
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
+   final Timer timer = new Timer(true);
+   TimerTask task = new TimerTask() {
+   @Override
+   public void run() {
+   // verify that we actually are 
connected to a cluster
+   ImmutableList nodes = 
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+   if (nodes.isEmpty()) {
+   if (LOG.isInfoEnabled()) {
+   LOG.info("Connection 
Lost..Trying to reconnect to Elasticsearch nodes...");
+   }
+   open(new Configuration());
--- End diff --

I would not recommend calling the open() method from invoke(). Open() is a 
"lifecycle" method called by Flink. You should assume its only called one.
However, you can move the (re)connect logic into a separate method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217126041
  
The change is missing documentation updates & test cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-217125875
  
Thank you for opening the pull request.

I made some inline comments.
I don't think the proposed changes fix the issue described in the JIRA.
I would check on each `invoke()` if `hasFailure` is set. If that's the 
case, you can reconnect to EL.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62173382
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_NO_OF_CONN_RETRIES) && 
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
+   final Timer timer = new Timer(true);
+   TimerTask task = new TimerTask() {
+   @Override
+   public void run() {
+   // verify that we actually are 
connected to a cluster
+   ImmutableList nodes = 
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+   if (nodes.isEmpty()) {
+   if (LOG.isInfoEnabled()) {
+   LOG.info("Connection 
Lost..Trying to reconnect to Elasticsearch nodes...");
+   }
+   open(new Configuration());
--- End diff --

The old client is not closed.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62173042
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_NO_OF_CONN_RETRIES) && 
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
+   final Timer timer = new Timer(true);
+   TimerTask task = new TimerTask() {
+   @Override
+   public void run() {
+   // verify that we actually are 
connected to a cluster
+   ImmutableList nodes = 
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+   if (nodes.isEmpty()) {
+   if (LOG.isInfoEnabled()) {
+   LOG.info("Connection 
Lost..Trying to reconnect to Elasticsearch nodes...");
+   }
+   open(new Configuration());
+   } else {
+   timer.cancel();
+   
intializeAndCallElasticSearchSinkFunction(element);
+   }
+   }
+   };
+
+   timer.scheduleAtFixedRate(task, 0, 3000);
+
+   try {
+   Thread.sleep(3000 * 
params.getInt(CONFIG_NO_OF_CONN_RETRIES));
+   } catch (InterruptedException e) {
+   throw new RuntimeException(e.getMessage());
+   }
+   timer.cancel();
+   // verify that we actually are connected to a cluster
+   ImmutableList nodes = 
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+   if (nodes.isEmpty()) {
+   throw new RuntimeException("Client is not 
connected to any Elasticsearch nodes!");
+   }
+   } else {
+   intializeAndCallElasticSearchSinkFunction(element);
+   }
+   }
+
+   @Override
+   public void close() {
+   if (bulkProcessor != null) {
+   bulkProcessor.close();
+   bulkProcessor = null;
+   }
+
+   if (client != null) {
+   client.close();
+   }
+
+   if (hasFailure.get()) {
+   Throwable cause = failureThrowable.get();
+   if (cause != null) {
+   throw new RuntimeException("An error occured in 
ElasticsearchSink.", cause);
+   } else {
+   throw new RuntimeException("An error occured in 
ElasticsearchSink.");
+   }
+   }
 
+   }
+
+   private void intializeAndCallElasticSearchSinkFunction(T element) {
--- End diff --

Where is the method initializing the ES sink?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62172954
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_NO_OF_CONN_RETRIES) && 
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
+   final Timer timer = new Timer(true);
+   TimerTask task = new TimerTask() {
+   @Override
+   public void run() {
+   // verify that we actually are 
connected to a cluster
+   ImmutableList nodes = 
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+   if (nodes.isEmpty()) {
+   if (LOG.isInfoEnabled()) {
+   LOG.info("Connection 
Lost..Trying to reconnect to Elasticsearch nodes...");
+   }
+   open(new Configuration());
--- End diff --

The passed `element` is never send to ES in this case, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62172911
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
+
+   if (params.has(CONFIG_NO_OF_CONN_RETRIES) && 
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
--- End diff --

This is two hashmap lookups for each element. The check can be done only 
once in the open method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62172611
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}", 
client);
}
+   }
+
+   @Override
+   public void invoke(final T element) {
+   ParameterTool params = ParameterTool.fromMap(userConfig);
--- End diff --

Its very inefficient to create a new `ParameterTool` instance for each 
incoming element.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1962#discussion_r62172571
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -86,6 +88,7 @@
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+   public static final String CONFIG_NO_OF_CONN_RETRIES = "conn.retries";
--- End diff --

We usually don't abbreviate configuration keys. Can you rename it to 
"CONFIG_KEY_CONNECTION_RETRIES" ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-04 Thread sbcd90
Github user sbcd90 commented on the pull request:

https://github.com/apache/flink/pull/1962#issuecomment-216874916
  
Hello @fhueske , The tests do not fail because of the changes made in the 
PR. I tested the Junits for elasticsearch connector & all of them runs fine.
Can you kindly have a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

2016-05-03 Thread sbcd90
GitHub user sbcd90 opened a pull request:

https://github.com/apache/flink/pull/1962

[FLINK-3857][Streaming Connectors]Add reconnect attempt to Elasticsearch 
host

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-3857] Add 
reconnect attempt to Elasticsearch host")

- [ ] Documentation
  - Documentation added based on the changes made.

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sbcd90/flink elasticSearchRetryIssue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1962.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1962


commit 62990c984f0d2eca3ba89ed9c2d22c469f16b136
Author: Subhobrata Dey 
Date:   2016-05-04T02:16:35Z

[FLINK-3857][Streaming Connectors]Add reconnect attempt to Elasticsearch 
host




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---