[ https://issues.apache.org/jira/browse/FLINK-12551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sunxiongkun closed FLINK-12551. ------------------------------- Resolution: Not A Bug > elasticsearch6 connector print log error > ---------------------------------------- > > Key: FLINK-12551 > URL: https://issues.apache.org/jira/browse/FLINK-12551 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch > Affects Versions: 1.6.3 > Reporter: sunxiongkun > Priority: Minor > > when i use elasticsearch connector ,when my project is running,i find some > data does not insert elasticsearch ,so i want to read log help me ,but the > log does contain importance message,so i read source code > (org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i > find a error on write ERROR log. > > {code:java} > @Override > public void afterBulk(long executionId, BulkRequest request, BulkResponse > response) { > if (response.hasFailures()) { > BulkItemResponse itemResponse; > Throwable failure; > RestStatus restStatus; > try { > for (int i = 0; i < response.getItems().length; i++) { > itemResponse = response.getItems()[i]; > failure = > callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); > if (failure != null) { > LOG.error("Failed Elasticsearch item request: {}", > itemResponse.getFailureMessage(), failure); > restStatus = itemResponse.getFailure().getStatus(); > if (restStatus == null) { > failureHandler.onFailure(request.requests().get(i), failure, -1, > requestIndexer); > } else { > failureHandler.onFailure(request.requests().get(i), failure, > restStatus.getStatus(), requestIndexer); > } > } > } > } catch (Throwable t) { > // fail the sink and skip the rest of the items > // if the failure handler decides to throw an exception > failureThrowable.compareAndSet(null, t); > } > } > if (flushOnCheckpoint) { > numPendingRequests.getAndAdd(-request.numberOfActions()); > } > } > {code} > {code:java} > @Override > public void afterBulk(long executionId, BulkRequest request, Throwable > failure) { > LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), > failure.getCause()); > try { > for (ActionRequest action : request.requests()) { > failureHandler.onFailure(action, failure, -1, requestIndexer); > } > } catch (Throwable t) { > // fail the sink and skip the rest of the items > // if the failure handler decides to throw an exception > failureThrowable.compareAndSet(null, t); > } > if (flushOnCheckpoint) { > numPendingRequests.getAndAdd(-request.numberOfActions()); > } > } > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)