[ 
https://issues.apache.org/jira/browse/FLINK-12551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunxiongkun closed FLINK-12551.
-------------------------------
    Resolution: Not A Bug

> elasticsearch6 connector print log error
> ----------------------------------------
>
>                 Key: FLINK-12551
>                 URL: https://issues.apache.org/jira/browse/FLINK-12551
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / ElasticSearch
>    Affects Versions: 1.6.3
>            Reporter: sunxiongkun
>            Priority: Minor
>
> when i use elasticsearch connector ,when my project is running,i find some 
> data does not insert elasticsearch ,so i want to read log help me ,but the 
> log does contain importance message,so i read source code 
> (org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i 
> find a error on write ERROR log.
>  
> {code:java}
> @Override
> public void afterBulk(long executionId, BulkRequest request, BulkResponse 
> response) {
>  if (response.hasFailures()) {
>   BulkItemResponse itemResponse;
>   Throwable failure;
>   RestStatus restStatus;
>   try {
>    for (int i = 0; i < response.getItems().length; i++) {
>     itemResponse = response.getItems()[i];
>     failure = 
> callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
>     if (failure != null) {
>      LOG.error("Failed Elasticsearch item request: {}", 
> itemResponse.getFailureMessage(), failure);
>      restStatus = itemResponse.getFailure().getStatus();
>      if (restStatus == null) {
>       failureHandler.onFailure(request.requests().get(i), failure, -1, 
> requestIndexer);
>      } else {
>       failureHandler.onFailure(request.requests().get(i), failure, 
> restStatus.getStatus(), requestIndexer);
>      }
>     }
>    }
>   } catch (Throwable t) {
>    // fail the sink and skip the rest of the items
>    // if the failure handler decides to throw an exception
>    failureThrowable.compareAndSet(null, t);
>   }
>  }
>  if (flushOnCheckpoint) {
>   numPendingRequests.getAndAdd(-request.numberOfActions());
>  }
> }
> {code}
> {code:java}
> @Override
>  public void afterBulk(long executionId, BulkRequest request, Throwable 
> failure) {
>   LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), 
> failure.getCause());
>   try {
>    for (ActionRequest action : request.requests()) {
>     failureHandler.onFailure(action, failure, -1, requestIndexer);
>    }
>   } catch (Throwable t) {
>    // fail the sink and skip the rest of the items
>    // if the failure handler decides to throw an exception
>    failureThrowable.compareAndSet(null, t);
>   }
>   if (flushOnCheckpoint) {
>    numPendingRequests.getAndAdd(-request.numberOfActions());
>   }
>  }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to