Re: How to use ExceptionUtils.findThrowable() in Scala

2019-02-07 Thread Averell
P/S: This is the full stack trace

2019-02-07 01:53:12.790 [I/O dispatcher 16] ERROR
o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
Elasticsearch item request: [...][[...][1]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]]
org.elasticsearch.ElasticsearchException: Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]
at
org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
at
org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
at
org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
at
org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
at
org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
at
org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
at
org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
at
org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
at 
org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
at
org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to use ExceptionUtils.findThrowable() in Scala

2019-02-07 Thread Averell
Hello,

I am trying to implement error handling in ElasticSearch sink (following the
seem-outdated Flink document [1])


override def onFailure(actionRequest: ActionRequest, failure: Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
if (ExceptionUtils.findThrowable(failure,
classOf[org.elasticsearch.index.engine.VersionConflictEngineException]) !=
Optional.empty()) {
LOG.warn("Failed inserting record to ElasticSearch: 
statusCode {}
message: {} record: {} stacktrace {}.\nRetrying", restStatusCode.toString,
failure.getMessage, actionRequest.toString, failure.getStackTrace)
// Do something here
}
else {
LOG.error(s"ELASTICSEARCH FAILED:\nstatusCode 
$restStatusCode\n   
message: ${failure.getMessage}\n${failure.getStackTrace}")
}
}


I tried to have different handling for the case of
VersionConflictEngineException, but failed. It always came to the "else"
branch, thus my log message is:
/ELASTICSEARCH FAILED:
statusCode 409
message: Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]
/
Thanks and best regards,
Averell

[1]  handling-failing-elasticsearch-requests

  




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/