[jira] [Commented] (FLUME-2254) Improve log when there is a failure in a BulkRequest in ES Sink and avoid stuck failures in the channel

2015-09-25 Thread Francis (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14908113#comment-14908113
 ] 

Francis commented on FLUME-2254:


This issue has been in progress for 3 years, do you have an ETA? This is a 
major problem for us. If we ever send an invalid event by mistake, Flume will 
continually resend the whole batch until we restart it. When an unrecoverable 
error occurs when sending events to ES, Flume should no try to send it again! 
This is really easy to reproduce, you just have to send an event with a field 
having a string value when ES expects an integer value.

> Improve log when there is a failure in a BulkRequest in ES Sink and avoid 
> stuck failures in the channel
> ---
>
> Key: FLUME-2254
> URL: https://issues.apache.org/jira/browse/FLUME-2254
> Project: Flume
>  Issue Type: Bug
>  Components: Sinks+Sources
>Affects Versions: v1.4.0
>Reporter: Luis Pigueiras
>Assignee: Ashish Paliwal
>Priority: Minor
>
> I found two problems when there is a failure trying do to a {{BulkRequest}} 
> to ElasticSearch using the ElasticSearch sink. 
> One of them is that if there is one insertion failing inside the entire bulk, 
> the events from the BulkRequest get stuck in the channel (even the ones that 
> are not failing because if one fails the entire request is failing) and ours 
> indexes grow because it's inserting the same events again and again (the ones 
> that don't have any error) in ES due to Flume is retrying to insert those 
> events. 
> Another one is that it's difficult to see in the logs what events are the 
> ones causing a fail in the entire request. Now you can only see the error 
> output from ElasticSearch and it will very nice to know what events are not 
> being inserted in ElasticSearch.
> My proposal to solve this is:
> - To save all the Events of the BulkRequest in some structure.
> - Remove the throw of {{EventDeliveryException}}.
> - If there is a failure, iterate over each event doing a request to ES, 
> trying to know which events cannot be inserted in ES. If there is a failure 
> with one of them, print the error and the event to the logs.
> Here is an example of how this can be implemented, it's not the most smart 
> way to do it, but for me it works.
> {code}
> @@ -81,6 +80,8 @@ public class ElasticSearchSink extends AbstractSink 
> implements Configurable {
>  try {
>txn.begin();
>BulkRequestBuilder bulkRequest = client.prepareBulk();
> +  LinkedList events = new LinkedList();
> +
>for (int i = 0; i < batchSize; i++) {
>  Event event = channel.take();
>  
> @@ -88,14 +89,14 @@ public class ElasticSearchSink extends AbstractSink 
> implements Configurable {
>break;
>  }
>  
> IndexRequestBuilder indexRequest =
> indexRequestFactory.createIndexRequest(
> client, indexName, indexType, event);
>  
>  if (ttlMs > 0) {
>indexRequest.setTTL(ttlMs);
>  }
>  
> +events.add(event);
>  bulkRequest.add(indexRequest);
>}
> @@ -116,7 +117,35 @@ public class ElasticSearchSink extends AbstractSink 
> implements Configurable {
>  
>  BulkResponse bulkResponse = bulkRequest.execute().actionGet();
>  if (bulkResponse.hasFailures()) {
> -  throw new 
> EventDeliveryException(bulkResponse.buildFailureMessage());
> +  logger
> +  .warn("There is a failure in the bulk request with this 
> output\n"
> +  + bulkResponse.buildFailureMessage());
> +  logger
> +  .warn("Trying to do the requests separately to know what 
> notifications"
> +  + " of the requests have errors");
> +  for (Event event : events) {
> +try {
> +  indexRequestFactory
> +  .createIndexRequest(client, indexName, indexType, event)
> +  .execute().actionGet();
> +} catch (Exception e) {
> +  logger.error(e.getMessage());
> +
> +  if (serializer instanceof ElasticSearchEventSerializer) {
> +XContentBuilder builder = (XContentBuilder) 
> ((ElasticSearchEventSerializer) serializer)
> +.getContentBuilder(event);
> +logger
> +.error("There is an error with the following 
> notification:\n"
> ++ builder.string());
> +  } else {
> +logger
> +.error("There is no possibility of printing the 
> notification because"
> ++ " the serializer is a different instance from"
> ++ " ElasticSearchEventSerializer");
> +  }

[jira] [Commented] (FLUME-2254) Improve log when there is a failure in a BulkRequest in ES Sink and avoid stuck failures in the channel

2013-12-16 Thread Ashish Paliwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13850176#comment-13850176
 ] 

Ashish Paliwal commented on FLUME-2254:
---

This looks like to an exactly once semantics expectation, in case of an error. 

[~hshreedharan] AFAIK, we don't have exactly once semantics in Flume and 
duplicate event may be present in case of failure scenarios. Do we want to fix 
this?

 Improve log when there is a failure in a BulkRequest in ES Sink and avoid 
 stuck failures in the channel
 ---

 Key: FLUME-2254
 URL: https://issues.apache.org/jira/browse/FLUME-2254
 Project: Flume
  Issue Type: Bug
  Components: Sinks+Sources
Affects Versions: v1.4.0
Reporter: Luis Pigueiras
Assignee: Ashish Paliwal
Priority: Minor

 I found two problems when there is a failure trying do to a {{BulkRequest}} 
 to ElasticSearch using the ElasticSearch sink. 
 One of them is that if there is one insertion failing inside the entire bulk, 
 the events from the BulkRequest get stuck in the channel (even the ones that 
 are not failing because if one fails the entire request is failing) and ours 
 indexes grow because it's inserting the same events again and again (the ones 
 that don't have any error) in ES due to Flume is retrying to insert those 
 events. 
 Another one is that it's difficult to see in the logs what events are the 
 ones causing a fail in the entire request. Now you can only see the error 
 output from ElasticSearch and it will very nice to know what events are not 
 being inserted in ElasticSearch.
 My proposal to solve this is:
 - To save all the Events of the BulkRequest in some structure.
 - Remove the throw of {{EventDeliveryException}}.
 - If there is a failure, iterate over each event doing a request to ES, 
 trying to know which events cannot be inserted in ES. If there is a failure 
 with one of them, print the error and the event to the logs.
 Here is an example of how this can be implemented, it's not the most smart 
 way to do it, but for me it works.
 {code}
 @@ -81,6 +80,8 @@ public class ElasticSearchSink extends AbstractSink 
 implements Configurable {
  try {
txn.begin();
BulkRequestBuilder bulkRequest = client.prepareBulk();
 +  LinkedListEvent events = new LinkedListEvent();
 +
for (int i = 0; i  batchSize; i++) {
  Event event = channel.take();
  
 @@ -88,14 +89,14 @@ public class ElasticSearchSink extends AbstractSink 
 implements Configurable {
break;
  }
  
 IndexRequestBuilder indexRequest =
 indexRequestFactory.createIndexRequest(
 client, indexName, indexType, event);
  
  if (ttlMs  0) {
indexRequest.setTTL(ttlMs);
  }
  
 +events.add(event);
  bulkRequest.add(indexRequest);
}
 @@ -116,7 +117,35 @@ public class ElasticSearchSink extends AbstractSink 
 implements Configurable {
  
  BulkResponse bulkResponse = bulkRequest.execute().actionGet();
  if (bulkResponse.hasFailures()) {
 -  throw new 
 EventDeliveryException(bulkResponse.buildFailureMessage());
 +  logger
 +  .warn(There is a failure in the bulk request with this 
 output\n
 +  + bulkResponse.buildFailureMessage());
 +  logger
 +  .warn(Trying to do the requests separately to know what 
 notifications
 +  +  of the requests have errors);
 +  for (Event event : events) {
 +try {
 +  indexRequestFactory
 +  .createIndexRequest(client, indexName, indexType, event)
 +  .execute().actionGet();
 +} catch (Exception e) {
 +  logger.error(e.getMessage());
 +
 +  if (serializer instanceof ElasticSearchEventSerializer) {
 +XContentBuilder builder = (XContentBuilder) 
 ((ElasticSearchEventSerializer) serializer)
 +.getContentBuilder(event);
 +logger
 +.error(There is an error with the following 
 notification:\n
 ++ builder.string());
 +  } else {
 +logger
 +.error(There is no possibility of printing the 
 notification because
 ++  the serializer is a different instance from
 ++  ElasticSearchEventSerializer);
 +  }
 +
 +}
 +  }
  }
}
txn.commit();
 {code}



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)


[jira] [Commented] (FLUME-2254) Improve log when there is a failure in a BulkRequest in ES Sink and avoid stuck failures in the channel

2013-12-11 Thread Ashish Paliwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13845168#comment-13845168
 ] 

Ashish Paliwal commented on FLUME-2254:
---

Planning to spend some time with ES, will give a try with that.

 Improve log when there is a failure in a BulkRequest in ES Sink and avoid 
 stuck failures in the channel
 ---

 Key: FLUME-2254
 URL: https://issues.apache.org/jira/browse/FLUME-2254
 Project: Flume
  Issue Type: Bug
  Components: Sinks+Sources
Affects Versions: v1.4.0
Reporter: Luis Pigueiras
Assignee: Ashish Paliwal
Priority: Minor

 I found two problems when there is a failure trying do to a {{BulkRequest}} 
 to ElasticSearch using the ElasticSearch sink. 
 One of them is that if there is one insertion failing inside the entire bulk, 
 the events from the BulkRequest get stuck in the channel (even the ones that 
 are not failing because if one fails the entire request is failing) and ours 
 indexes grow because it's inserting the same events again and again (the ones 
 that don't have any error) in ES due to Flume is retrying to insert those 
 events. 
 Another one is that it's difficult to see in the logs what events are the 
 ones causing a fail in the entire request. Now you can only see the error 
 output from ElasticSearch and it will very nice to know what events are not 
 being inserted in ElasticSearch.
 My proposal to solve this is:
 - To save all the Events of the BulkRequest in some structure.
 - Remove the throw of {{EventDeliveryException}}.
 - If there is a failure, iterate over each event doing a request to ES, 
 trying to know which events cannot be inserted in ES. If there is a failure 
 with one of them, print the error and the event to the logs.
 Here is an example of how this can be implemented, it's not the most smart 
 way to do it, but for me it works.
 {code}
 @@ -81,6 +80,8 @@ public class ElasticSearchSink extends AbstractSink 
 implements Configurable {
  try {
txn.begin();
BulkRequestBuilder bulkRequest = client.prepareBulk();
 +  LinkedListEvent events = new LinkedListEvent();
 +
for (int i = 0; i  batchSize; i++) {
  Event event = channel.take();
  
 @@ -88,14 +89,14 @@ public class ElasticSearchSink extends AbstractSink 
 implements Configurable {
break;
  }
  
 IndexRequestBuilder indexRequest =
 indexRequestFactory.createIndexRequest(
 client, indexName, indexType, event);
  
  if (ttlMs  0) {
indexRequest.setTTL(ttlMs);
  }
  
 +events.add(event);
  bulkRequest.add(indexRequest);
}
 @@ -116,7 +117,35 @@ public class ElasticSearchSink extends AbstractSink 
 implements Configurable {
  
  BulkResponse bulkResponse = bulkRequest.execute().actionGet();
  if (bulkResponse.hasFailures()) {
 -  throw new 
 EventDeliveryException(bulkResponse.buildFailureMessage());
 +  logger
 +  .warn(There is a failure in the bulk request with this 
 output\n
 +  + bulkResponse.buildFailureMessage());
 +  logger
 +  .warn(Trying to do the requests separately to know what 
 notifications
 +  +  of the requests have errors);
 +  for (Event event : events) {
 +try {
 +  indexRequestFactory
 +  .createIndexRequest(client, indexName, indexType, event)
 +  .execute().actionGet();
 +} catch (Exception e) {
 +  logger.error(e.getMessage());
 +
 +  if (serializer instanceof ElasticSearchEventSerializer) {
 +XContentBuilder builder = (XContentBuilder) 
 ((ElasticSearchEventSerializer) serializer)
 +.getContentBuilder(event);
 +logger
 +.error(There is an error with the following 
 notification:\n
 ++ builder.string());
 +  } else {
 +logger
 +.error(There is no possibility of printing the 
 notification because
 ++  the serializer is a different instance from
 ++  ElasticSearchEventSerializer);
 +  }
 +
 +}
 +  }
  }
}
txn.commit();
 {code}



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)