[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2017-03-03 Thread static-max
Github user static-max closed the pull request at:

https://github.com/apache/flink/pull/2861


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2017-01-03 Thread static-max
Github user static-max commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r94382504
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(checkErrorAndRetryBulk && (
+   
failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
+   )
+   ) {
+   
LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
+   
reAddBulkRequest(request);
--- End diff --

Your're right, it gets added multiple times, I'll fix that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r93049169
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -227,6 +264,37 @@ public void afterBulk(long executionId, BulkRequest 
request, Throwable failure)
requestIndexer = new BulkProcessorIndexer(bulkProcessor);
}
 
+   /**
+* Adds all requests of the bulk to the BulkProcessor. Used when trying 
again.
+* @param bulkRequest
+*/
+   public void reAddBulkRequest(BulkRequest bulkRequest) {
+   //TODO Check what happens when bulk contains a DeleteAction and 
IndexActions and the DeleteAction fails because the document already has been 
deleted. This may not happen in typical Flink jobs.
+
+   for (IndicesRequest req : bulkRequest.subRequests()) {
+   if (req instanceof ActionRequest) {
+   // There is no waiting time between index 
requests, so this may produce additional pressure on cluster
+   bulkProcessor.add((ActionRequest) req);
--- End diff --

Do you know if the BulkProcessor is thread safe? I assume multiple threads 
will add bulks concurrently (because of the calls from the callbacks)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r93048695
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(checkErrorAndRetryBulk && (
+   
failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
+   )
+   ) {
+   
LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
+   
reAddBulkRequest(request);
--- End diff --

Are you sure that the `BulkRequest` is only added once even if it contains 
multiple failed `BulkItemResponse`s?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r93045771
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
--- End diff --

I can not assess how often retries are needed.
Users can also manually increase the log level if needed. So we can leave 
it as is.
However, I'm wondering whether we want to include a metric that counts the 
number of retries that occurred.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r93043759
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
--- End diff --

Okay. I agree that there seems to be no better way to handle this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-12 Thread static-max
Github user static-max commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r91991599
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest 
request, Throwable failure)
requestIndexer = new BulkProcessorIndexer(bulkProcessor);
}
 
+   /**
+* Adds all requests of the bulk to the BulkProcessor. Used when trying 
again.
+* @param bulkRequest
+*/
+   public void reAddBulkRequest(BulkRequest bulkRequest) {
+   //TODO Check what happens when bulk contains a DeleteAction and 
IndexActions and the DeleteAction fails because the document already has been 
deleted. This may not happen in typical Flink jobs.
--- End diff --

Currently I'm not aware of a way to filter these requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-12 Thread static-max
Github user static-max commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r91964180
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
--- End diff --

What level do you suggest? Personally I don't care for retried batches as 
long as the data gets into my ES cluster. When logging this as INFO or WARN, 
the logfile will get pretty messy on a cluster with high traffic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-12 Thread static-max
Github user static-max commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r91963737
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
--- End diff --

I didn't find an alternative to check strings. I will add a flag an disable 
it by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90916177
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
--- End diff --

I would log here at a higher logging level.
Also, could you not use string concatenation here and use the "Retry batch: 
{}", itemResp.getFailureMessage()); pattern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90916759
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
--- End diff --

This string-based error matching seems to be a pretty unstable mechanism.
Can you add a flag to control whether the mechanism is enabled, and disable 
it by default (but document it on the ES connector page)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90917049
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
+   
reAddBulkRequest(request);
+   } else { // Cannot 
retry action
+   
allRequestsRepeatable = false;
+   
LOG.error("Failed to index document in Elasticsearch: " + 
itemResp.getFailureMessage());
+   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));   
+   }
}
}
-   hasFailure.set(true);
+   if (!allRequestsRepeatable) {
+   hasFailure.set(true);
+   }
}
}
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, Throwable failure) {
-   LOG.error(failure.getMessage());
-   failureThrowable.compareAndSet(null, failure);
-   hasFailure.set(true);
+   if (failure instanceof ClusterBlockException // 
Examples: "no master"
+   || failure instanceof 
ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, 
not seen in stress tests yet
+   ) 
+   {
+   LOG.debug("Retry batch on throwable: " 
+ failure.getMessage());
+   reAddBulkRequest(request);
+   } else { 
+   LOG.error("Failed to index bulk in 
Elasticsearch. " + failure.getMessage());
--- End diff --

String concat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90916979
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
+   
reAddBulkRequest(request);
+   } else { // Cannot 
retry action
+   
allRequestsRepeatable = false;
+   
LOG.error("Failed to index document in Elasticsearch: " + 
itemResp.getFailureMessage());
--- End diff --

{} instead of string concat.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90917303
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest 
request, Throwable failure)
requestIndexer = new BulkProcessorIndexer(bulkProcessor);
}
 
+   /**
+* Adds all requests of the bulk to the BulkProcessor. Used when trying 
again.
+* @param bulkRequest
+*/
+   public void reAddBulkRequest(BulkRequest bulkRequest) {
+   //TODO Check what happens when bulk contains a DeleteAction and 
IndexActions and the DeleteAction fails because the document already has been 
deleted. This may not happen in typical Flink jobs.
--- End diff --

So what about this TODO? Can we somehow filter these requests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90917039
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
+   
reAddBulkRequest(request);
+   } else { // Cannot 
retry action
+   
allRequestsRepeatable = false;
+   
LOG.error("Failed to index document in Elasticsearch: " + 
itemResp.getFailureMessage());
+   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));   
+   }
}
}
-   hasFailure.set(true);
+   if (!allRequestsRepeatable) {
+   hasFailure.set(true);
+   }
}
}
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, Throwable failure) {
-   LOG.error(failure.getMessage());
-   failureThrowable.compareAndSet(null, failure);
-   hasFailure.set(true);
+   if (failure instanceof ClusterBlockException // 
Examples: "no master"
+   || failure instanceof 
ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, 
not seen in stress tests yet
+   ) 
+   {
+   LOG.debug("Retry batch on throwable: " 
+ failure.getMessage());
--- End diff --

String concat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-11-23 Thread static-max
GitHub user static-max opened a pull request:

https://github.com/apache/flink/pull/2861

[FLINK-5122] Index requests will be retried if the error is only temp…

This PR will re-add index requests to the BulkProcessor if the error is 
temporay, like
* Generel timeout errors
* No master
* UnavailableShardsException (Rebalancing, Node down)
* Bulk queue on node full

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/static-max/flink 
flink-connector-elasticsearch2-robust

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2861.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2861


commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d
Author: Max Kuklinski 
Date:   2016-11-23T16:54:11Z

[FLINK-5122] Index requests will be retried if the error is only temporary 
on Elasticsearch side. Covered are: Timeouts, No Master, 
UnavailableShardsException, bulk queue on node full




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---