[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max closed the pull request at: https://github.com/apache/flink/pull/2861 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r94382504 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (checkErrorAndRetryBulk && ( + failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) + ) { + LOG.debug("Retry bulk: {}", itemResp.getFailureMessage()); + reAddBulkRequest(request); --- End diff -- Your're right, it gets added multiple times, I'll fix that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r93049169 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -227,6 +264,37 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) requestIndexer = new BulkProcessorIndexer(bulkProcessor); } + /** +* Adds all requests of the bulk to the BulkProcessor. Used when trying again. +* @param bulkRequest +*/ + public void reAddBulkRequest(BulkRequest bulkRequest) { + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. + + for (IndicesRequest req : bulkRequest.subRequests()) { + if (req instanceof ActionRequest) { + // There is no waiting time between index requests, so this may produce additional pressure on cluster + bulkProcessor.add((ActionRequest) req); --- End diff -- Do you know if the BulkProcessor is thread safe? I assume multiple threads will add bulks concurrently (because of the calls from the callbacks) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r93048695 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (checkErrorAndRetryBulk && ( + failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) + ) { + LOG.debug("Retry bulk: {}", itemResp.getFailureMessage()); + reAddBulkRequest(request); --- End diff -- Are you sure that the `BulkRequest` is only added once even if it contains multiple failed `BulkItemResponse`s? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r93045771 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); --- End diff -- I can not assess how often retries are needed. Users can also manually increase the log level if needed. So we can leave it as is. However, I'm wondering whether we want to include a metric that counts the number of retries that occurred. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r93043759 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors --- End diff -- Okay. I agree that there seems to be no better way to handle this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91991599 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) requestIndexer = new BulkProcessorIndexer(bulkProcessor); } + /** +* Adds all requests of the bulk to the BulkProcessor. Used when trying again. +* @param bulkRequest +*/ + public void reAddBulkRequest(BulkRequest bulkRequest) { + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. --- End diff -- Currently I'm not aware of a way to filter these requests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91964180 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); --- End diff -- What level do you suggest? Personally I don't care for retried batches as long as the data gets into my ES cluster. When logging this as INFO or WARN, the logfile will get pretty messy on a cluster with high traffic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91963737 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors --- End diff -- I didn't find an alternative to check strings. I will add a flag an disable it by default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90916177 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); --- End diff -- I would log here at a higher logging level. Also, could you not use string concatenation here and use the "Retry batch: {}", itemResp.getFailureMessage()); pattern? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90916759 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors --- End diff -- This string-based error matching seems to be a pretty unstable mechanism. Can you add a flag to control whether the mechanism is enabled, and disable it by default (but document it on the ES connector page) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90917049 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + } else { // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + } } } - hasFailure.set(true); + if (!allRequestsRepeatable) { + hasFailure.set(true); + } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error(failure.getMessage()); - failureThrowable.compareAndSet(null, failure); - hasFailure.set(true); + if (failure instanceof ClusterBlockException // Examples: "no master" + || failure instanceof ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, not seen in stress tests yet + ) + { + LOG.debug("Retry batch on throwable: " + failure.getMessage()); + reAddBulkRequest(request); + } else { + LOG.error("Failed to index bulk in Elasticsearch. " + failure.getMessage()); --- End diff -- String concat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90916979 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + } else { // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); --- End diff -- {} instead of string concat. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90917303 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) requestIndexer = new BulkProcessorIndexer(bulkProcessor); } + /** +* Adds all requests of the bulk to the BulkProcessor. Used when trying again. +* @param bulkRequest +*/ + public void reAddBulkRequest(BulkRequest bulkRequest) { + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. --- End diff -- So what about this TODO? Can we somehow filter these requests? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90917039 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + } else { // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + } } } - hasFailure.set(true); + if (!allRequestsRepeatable) { + hasFailure.set(true); + } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error(failure.getMessage()); - failureThrowable.compareAndSet(null, failure); - hasFailure.set(true); + if (failure instanceof ClusterBlockException // Examples: "no master" + || failure instanceof ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, not seen in stress tests yet + ) + { + LOG.debug("Retry batch on throwable: " + failure.getMessage()); --- End diff -- String concat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
GitHub user static-max opened a pull request: https://github.com/apache/flink/pull/2861 [FLINK-5122] Index requests will be retried if the error is only temp⦠This PR will re-add index requests to the BulkProcessor if the error is temporay, like * Generel timeout errors * No master * UnavailableShardsException (Rebalancing, Node down) * Bulk queue on node full You can merge this pull request into a Git repository by running: $ git pull https://github.com/static-max/flink flink-connector-elasticsearch2-robust Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2861.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2861 commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d Author: Max KuklinskiDate: 2016-11-23T16:54:11Z [FLINK-5122] Index requests will be retried if the error is only temporary on Elasticsearch side. Covered are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---