Hi team,

We encountered an issue about ES sink connector timeout quite frequently.
As checked the ES cluster is far from being loaded(~40% CPU utilization, no
query, index rate is also low). We're using ES-7 connector, with 12 data
nodes and parallelism of 32.

The error log is as below, we want to know if there is any way to locate
the issue or configure the timeout parameter.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/

*2021-06-05 11:49:10*
*java.lang.RuntimeException: An error occurred in ElasticsearchSink.*
*    at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)*
*    at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)*
*    at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)*
*    at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)*
*    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)*
*    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)*
*    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)*
*    at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)*
*    at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)*
*    at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)*
*    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)*
*    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)*
*    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)*
*    at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)*
*    at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)*
*    at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)*
*    at
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)*
*    at
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)*
*    at
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)*
*    at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)*
*    at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)*
*    at org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)*
*    at org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)*
*    at org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)*
*    at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)*
*    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)*
*    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)*
*    at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)*
*    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)*
*    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)*
*    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*
*    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*
*    at java.lang.Thread.run(Thread.java:748)*
*Caused by: java.net <http://java.net>.SocketTimeoutException: 30,000
milliseconds timeout on connection http-outgoing-21 [ACTIVE]*
*    at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)*
*    at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)*
*    at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)*
*    at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)*
*    at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)*
*    at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)*
*    at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)*
*    at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)*
*    at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)*
*    at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)*
*    ... 1 more*

Config:



*WITH (    'connector' = 'elasticsearch-7',    'hosts' = 'https://xxx:443
<https://xxx:443>',    'index' = 'xxx',*







*    'sink.bulk-flush.max-actions' = '10000',    'sink.bulk-flush.max-size'
= '2mb',    'sink.flush-on-checkpoint' = 'true',
'connection.max-retry-timeout' = '60s',    'failure-handler' =
'retry-rejected',    'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL',
'sink.bulk-flush.interval' = '2s');*

-- 
*Best wishes,*
*- Kai*

Reply via email to