[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user sbcd90 commented on the pull request: https://github.com/apache/flink/pull/1962#issuecomment-217540405 Hello @rmetzger , I added a testcase now to the `ElasticsearchSinkITCase.java` list of tests. Can you kindly have a look once? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1962#issuecomment-217373539 How did you test the code you've implemented in this pull request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user sbcd90 commented on the pull request: https://github.com/apache/flink/pull/1962#issuecomment-217342680 Hello @rmetzger , Looking at the test case `ElasticsearchSinkItCase.testTransportClient`, I think to test the re-connect scenario the `hasFailure` may need to be made `public` so that the test-method can set it. Can you kindly provide some suggestions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user sbcd90 commented on the pull request: https://github.com/apache/flink/pull/1962#issuecomment-217298024 Hello @rmetzger , Thanks a lot for reviewing the PR. I have made all the changes mentioned by you as inline comments as well as added some documentation. Kindly have a look now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1962#discussion_r62174087 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -177,7 +180,72 @@ public void open(Configuration configuration) { if (LOG.isInfoEnabled()) { LOG.info("Created Elasticsearch TransportClient {}", client); } + } + + @Override + public void invoke(final T element) { + ParameterTool params = ParameterTool.fromMap(userConfig); + + if (params.has(CONFIG_NO_OF_CONN_RETRIES) && params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) { + final Timer timer = new Timer(true); + TimerTask task = new TimerTask() { + @Override + public void run() { + // verify that we actually are connected to a cluster + ImmutableList nodes = ImmutableList.copyOf(((TransportClient) client).connectedNodes()); + if (nodes.isEmpty()) { + if (LOG.isInfoEnabled()) { + LOG.info("Connection Lost..Trying to reconnect to Elasticsearch nodes..."); + } + open(new Configuration()); --- End diff -- I would not recommend calling the open() method from invoke(). Open() is a "lifecycle" method called by Flink. You should assume its only called one. However, you can move the (re)connect logic into a separate method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1962#issuecomment-217126041 The change is missing documentation updates & test cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1962#issuecomment-217125875 Thank you for opening the pull request. I made some inline comments. I don't think the proposed changes fix the issue described in the JIRA. I would check on each `invoke()` if `hasFailure` is set. If that's the case, you can reconnect to EL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1962#discussion_r62173382 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -177,7 +180,72 @@ public void open(Configuration configuration) { if (LOG.isInfoEnabled()) { LOG.info("Created Elasticsearch TransportClient {}", client); } + } + + @Override + public void invoke(final T element) { + ParameterTool params = ParameterTool.fromMap(userConfig); + + if (params.has(CONFIG_NO_OF_CONN_RETRIES) && params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) { + final Timer timer = new Timer(true); + TimerTask task = new TimerTask() { + @Override + public void run() { + // verify that we actually are connected to a cluster + ImmutableList nodes = ImmutableList.copyOf(((TransportClient) client).connectedNodes()); + if (nodes.isEmpty()) { + if (LOG.isInfoEnabled()) { + LOG.info("Connection Lost..Trying to reconnect to Elasticsearch nodes..."); + } + open(new Configuration()); --- End diff -- The old client is not closed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1962#discussion_r62173042 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -177,7 +180,72 @@ public void open(Configuration configuration) { if (LOG.isInfoEnabled()) { LOG.info("Created Elasticsearch TransportClient {}", client); } + } + + @Override + public void invoke(final T element) { + ParameterTool params = ParameterTool.fromMap(userConfig); + + if (params.has(CONFIG_NO_OF_CONN_RETRIES) && params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) { + final Timer timer = new Timer(true); + TimerTask task = new TimerTask() { + @Override + public void run() { + // verify that we actually are connected to a cluster + ImmutableList nodes = ImmutableList.copyOf(((TransportClient) client).connectedNodes()); + if (nodes.isEmpty()) { + if (LOG.isInfoEnabled()) { + LOG.info("Connection Lost..Trying to reconnect to Elasticsearch nodes..."); + } + open(new Configuration()); + } else { + timer.cancel(); + intializeAndCallElasticSearchSinkFunction(element); + } + } + }; + + timer.scheduleAtFixedRate(task, 0, 3000); + + try { + Thread.sleep(3000 * params.getInt(CONFIG_NO_OF_CONN_RETRIES)); + } catch (InterruptedException e) { + throw new RuntimeException(e.getMessage()); + } + timer.cancel(); + // verify that we actually are connected to a cluster + ImmutableList nodes = ImmutableList.copyOf(((TransportClient) client).connectedNodes()); + if (nodes.isEmpty()) { + throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); + } + } else { + intializeAndCallElasticSearchSinkFunction(element); + } + } + + @Override + public void close() { + if (bulkProcessor != null) { + bulkProcessor.close(); + bulkProcessor = null; + } + + if (client != null) { + client.close(); + } + + if (hasFailure.get()) { + Throwable cause = failureThrowable.get(); + if (cause != null) { + throw new RuntimeException("An error occured in ElasticsearchSink.", cause); + } else { + throw new RuntimeException("An error occured in ElasticsearchSink."); + } + } + } + + private void intializeAndCallElasticSearchSinkFunction(T element) { --- End diff -- Where is the method initializing the ES sink? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1962#discussion_r62172954 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -177,7 +180,72 @@ public void open(Configuration configuration) { if (LOG.isInfoEnabled()) { LOG.info("Created Elasticsearch TransportClient {}", client); } + } + + @Override + public void invoke(final T element) { + ParameterTool params = ParameterTool.fromMap(userConfig); + + if (params.has(CONFIG_NO_OF_CONN_RETRIES) && params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) { + final Timer timer = new Timer(true); + TimerTask task = new TimerTask() { + @Override + public void run() { + // verify that we actually are connected to a cluster + ImmutableList nodes = ImmutableList.copyOf(((TransportClient) client).connectedNodes()); + if (nodes.isEmpty()) { + if (LOG.isInfoEnabled()) { + LOG.info("Connection Lost..Trying to reconnect to Elasticsearch nodes..."); + } + open(new Configuration()); --- End diff -- The passed `element` is never send to ES in this case, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1962#discussion_r62172911 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -177,7 +180,72 @@ public void open(Configuration configuration) { if (LOG.isInfoEnabled()) { LOG.info("Created Elasticsearch TransportClient {}", client); } + } + + @Override + public void invoke(final T element) { + ParameterTool params = ParameterTool.fromMap(userConfig); + + if (params.has(CONFIG_NO_OF_CONN_RETRIES) && params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) { --- End diff -- This is two hashmap lookups for each element. The check can be done only once in the open method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1962#discussion_r62172611 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -177,7 +180,72 @@ public void open(Configuration configuration) { if (LOG.isInfoEnabled()) { LOG.info("Created Elasticsearch TransportClient {}", client); } + } + + @Override + public void invoke(final T element) { + ParameterTool params = ParameterTool.fromMap(userConfig); --- End diff -- Its very inefficient to create a new `ParameterTool` instance for each incoming element. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1962#discussion_r62172571 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -86,6 +88,7 @@ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + public static final String CONFIG_NO_OF_CONN_RETRIES = "conn.retries"; --- End diff -- We usually don't abbreviate configuration keys. Can you rename it to "CONFIG_KEY_CONNECTION_RETRIES" ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
Github user sbcd90 commented on the pull request: https://github.com/apache/flink/pull/1962#issuecomment-216874916 Hello @fhueske , The tests do not fail because of the changes made in the PR. I tested the Junits for elasticsearch connector & all of them runs fine. Can you kindly have a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...
GitHub user sbcd90 opened a pull request: https://github.com/apache/flink/pull/1962 [FLINK-3857][Streaming Connectors]Add reconnect attempt to Elasticsearch host - [ ] General - The pull request references the related JIRA issue ("[FLINK-3857] Add reconnect attempt to Elasticsearch host") - [ ] Documentation - Documentation added based on the changes made. - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sbcd90/flink elasticSearchRetryIssue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1962.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1962 commit 62990c984f0d2eca3ba89ed9c2d22c469f16b136 Author: Subhobrata DeyDate: 2016-05-04T02:16:35Z [FLINK-3857][Streaming Connectors]Add reconnect attempt to Elasticsearch host --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---