Shengnan YU created FLINK-14938: ----------------------------------- Summary: Flink elasticsearch failure handler re-add indexrequest causes ConcurrentModificationException Key: FLINK-14938 URL: https://issues.apache.org/jira/browse/FLINK-14938 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Affects Versions: 1.8.1 Reporter: Shengnan YU
When use Elasticsearch connector failure handler (from official example) to re-add documents, Flink encountered ConcurrentModificationException. {code:java} input.addSink(new ElasticsearchSink<>( config, transportAddresses, new ElasticsearchSinkFunction<String>() {...}, new ActionRequestFailureHandler() { @Override void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throw Throwable { if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) { // full queue; re-add document for indexing indexer.add(action); } } })); {code} I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, it will iterator a list of ActionRequest. However the failure handler will keep re-adding request to that list after bulk, which causes ConcurrentModificationException. I think it should be a multi-thread bug and need to find a thread-safe List to maintain the failure request? -- This message was sent by Atlassian Jira (v8.3.4#803005)