With some investigation in the task manager's log, the exception was raised
from RetryRejectedExecutionFailureHandler path, the related logs are
showing below, not sure why it's that.










































* 5978 2021-06-05 05:31:31,529 INFO
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler
[] - Bulk request 1033 has been cancelled. 5979
java.lang.InterruptedException: null 5980 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
~[?:1.8.0_272] 5981 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
~[?:1.8.0_272] 5982 at
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
~[?:1.8.0_272] 5983 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 5984 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5985 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5986 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] 5987 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5988 at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5989 at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 5990 at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 5991 at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 5992 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5993 at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5994 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5995 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5996 at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5997 at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5998 at
java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] 5999 2021-06-05
05:31:31,530 ERROR
org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
[] - Failed Elasticsearch item request: null 6000
java.lang.InterruptedException: null 6001 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
~[?:1.8.0_272] 6002 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
~[?:1.8.0_272] 6003 at
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
~[?:1.8.0_272] 6004 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 6005 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6006 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6007 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] 6008 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6009 at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6010 at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 6011 at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 6012 at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 6013 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
[flink-dist_2.11-1.13.1.jar:1.13.1] 6014 at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
[flink-dist_2.11-1.13.1.jar:1.13.1] 6015 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
[flink-dist_2.11-1.13.1.jar:1.13.1] 6016 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
[flink-dist_2.11-1.13.1.jar:1.13.1] 6017 at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
[flink-dist_2.11-1.13.1.jar:1.13.1]*












* 6030 2021-06-05 05:31:31,633 ERROR
org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
[] - Failed Elasticsearch item request: Connection closed unexpectedly 6031
org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException:
Connection closed unexpectedly 6032 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146)
[flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1] 6033 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71)
[flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6034 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39)
[flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6035 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100)
[flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1] 6036 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277)
[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 6037 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449)
[flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] 6038 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283)
[flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6039 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6040 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
[flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] 6041 at
java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]*

On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <zzfu...@gmail.com> wrote:

> Hi team,
>
> We encountered an issue about ES sink connector timeout quite frequently.
> As checked the ES cluster is far from being loaded(~40% CPU utilization, no
> query, index rate is also low). We're using ES-7 connector, with 12 data
> nodes and parallelism of 32.
>
> The error log is as below, we want to know if there is any way to locate
> the issue or configure the timeout parameter.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/
>
> *2021-06-05 11:49:10*
> *java.lang.RuntimeException: An error occurred in ElasticsearchSink.*
> *    at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)*
> *    at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)*
> *    at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)*
> *    at
> org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)*
> *    at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)*
> *    at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)*
> *    at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)*
> *    at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)*
> *    at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)*
> *    at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)*
> *    at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)*
> *    at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)*
> *    at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)*
> *    at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)*
> *    at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)*
> *    at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)*
> *    at
> org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)*
> *    at
> org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)*
> *    at
> org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)*
> *    at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)*
> *    at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)*
> *    at org.apache.flink.streaming.runtime.io
> <http://org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)*
> *    at org.apache.flink.streaming.runtime.io
> <http://org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)*
> *    at org.apache.flink.streaming.runtime.io
> <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)*
> *    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)*
> *    at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)*
> *    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)*
> *    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)*
> *    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)*
> *    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)*
> *    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*
> *    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*
> *    at java.lang.Thread.run(Thread.java:748)*
> *Caused by: java.net <http://java.net>.SocketTimeoutException: 30,000
> milliseconds timeout on connection http-outgoing-21 [ACTIVE]*
> *    at
> org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)*
> *    at
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)*
> *    at
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)*
> *    at
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)*
> *    at
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)*
> *    at
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)*
> *    at
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)*
> *    at
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)*
> *    at
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)*
> *    at
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)*
> *    ... 1 more*
>
> Config:
>
>
>
> *WITH (    'connector' = 'elasticsearch-7',    'hosts' = 'https://xxx:443
> <https://xxx:443>',    'index' = 'xxx',*
>
>
>
>
>
>
>
> *    'sink.bulk-flush.max-actions' = '10000',
> 'sink.bulk-flush.max-size' = '2mb',    'sink.flush-on-checkpoint' =
> 'true',    'connection.max-retry-timeout' = '60s',    'failure-handler' =
> 'retry-rejected',    'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL',
> 'sink.bulk-flush.interval' = '2s');*
>
> --
> *Best wishes,*
> *- Kai*
>


-- 
*Best wishes,*
*- Kai*

Reply via email to