Hi, ren

I think the root cause is you didn’t set proper FailureHandler for 
ElasticSearch connector, the `RetryRejectedExecutionFailureHandler` can resolve 
your issue, you can see ElasticSearch connector docs[1] for more information. 
You can also set 'connector.failure-handler to 'retry-rejected’ in your 
Elasticsearch table DDL if you’re using Flink SQL rather than DataStream 
Application.

Btw, please use English in user@flink.apache.org or send Chinese email to 
user...@flink.apache.org for better communication.

Best,
Leonard
[1]https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html



> 2021年12月3日 下午2:26,淘宝龙安 <rentb...@gmail.com> 写道:
> 
> hi, all
>   
> 我遇到了一个非常难解决的问题,我的场景是一个非常简单和常见的场景,从kafka消费数据,然后写入es,但是当es的集群负载较高,发生写拒绝的时候(es_rejected_execution_exception),整个flink任务就会hang住,不再消费数据,也不重启,所有的checkpoint都会失败,我找到一些线索,但是始终找不到问题在哪里。
> 
> 现象: 
> 发生写入拒绝时候,flink报错的日志(TaskManager), 在这之后,整个job就会卡死,不再消费数据,无论es集群是否正常。
> 
> 2021-10-11 08:07:28,804 I/O dispatcher 6 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase  - 
> Failed Elasticsearch item request: ElasticsearchException[Elasticsearch 
> exception [type=es_rejected_execution_exception, reason=rejected execution of 
> processing of [1477079498][indices:data/write/bulk[s][p]]: request: 
> BulkShardRequest [[acs_period_stl_index_2021_03_30_19_06_21][0]] containing 
> [100] requests, target allocation id: rnOmk_VtSgGAOT8e0dCefQ, primary term: 1 
> on EsThreadPoolExecutor[name = VECS014179/write, queue capacity = 200, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@640a33fe[Running,
>  pool size = 8, active threads = 8, queued tasks = 226, completed tasks = 
> 1171462808]]]]
> ElasticsearchException[Elasticsearch exception 
> [type=es_rejected_execution_exception, reason=rejected execution of 
> processing of [1477079498][indices:data/write/bulk[s][p]]: request: 
> BulkShardRequest [[acs_period_stl_index_2021_03_30_19_06_21][0]] containing 
> [100] requests, target allocation id: rnOmk_VtSgGAOT8e0dCefQ, primary term: 1 
> on EsThreadPoolExecutor[name = VECS014179/write, queue capacity = 200, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@640a33fe[Running,
>  pool size = 8, active threads = 8, queued tasks = 226, completed tasks = 
> 1171462808]]]]
> at 
> org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:491)
> at 
> org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:402)
> at 
> org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:139)
> at 
> org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:199)
> at 
> org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1727)
> at 
> org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1520)
> at 
> org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1598)
> at 
> org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:556)
> at org.elasticsearch.client.RestClient$1.completed(RestClient.java:300)
> at org.elasticsearch.client.RestClient$1.completed(RestClient.java:294)
> at 
> com.huster.hidden.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
> at 
> com.huster.hidden.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
> at 
> com.huster.hidden.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
> at 
> com.huster.hidden.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
> at 
> com.huster.hidden.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
> at 
> com.huster.hidden.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
> at 
> com.huster.hidden.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
> at 
> com.huster.hidden.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
> at 
> com.huster.hidden.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
> at 
> com.huster.hidden.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
> at 
> com.huster.hidden.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
> at 
> com.huster.hidden.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
> at 
> com.huster.hidden.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> at 
> com.huster.hidden.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
> at java.lang.Thread.run(Thread.java:748)
> 
> 
> checkpoint的配置 :
> <image.png>
> 
> checkpoint全部失败:
> 
> <image.png>
> 
> 
> 
> 我使用的是flink 1.10.0 ,es的connector的版本: flink-connector-elasticsearch7_2.11, 
> v1.10.0, es的集群是7.5.2或者7.15.0都有遇到过,而且不止我的代码,其他人也遇到过相同的问题
> 
> 当部署到flink的yarn集群运行的时候,百分之百能够复现,但是到我本地IEDA里调试的时候,偶尔会发生又不是百分百能复现:
> 
> 当时卡死时候的jstack再附件里, 最后一次死掉的时候的debug日志:
> 
> 2021-12-02 22:06:55.627 [Checkpoint Timer] INFO  
> o.a.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 20 
> @ 1638454015627 for job d5a71c0aca220cf880f24bbac4fc7c8f.
> 2021-12-02 22:06:55.628 [flink-akka.actor.default-dispatcher-62] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Trigger checkpoint 
> 20@1638454015627 for e2fa126a099ade3e581253235ffdc797.
> 2021-12-02 22:06:59.946 [Flink-DispatcherRestEndpoint-thread-2] DEBUG 
> o.a.f.r.r.handler.legacy.metrics.MetricFetcherImpl - Start fetching metrics.
> 2021-12-02 22:06:59.947 [Flink-DispatcherRestEndpoint-thread-4] DEBUG 
> o.a.f.r.r.handler.legacy.metrics.MetricFetcherImpl - Retrieve metric query 
> service gateway for 
> akka.tcp://flink-metrics@172.19.179.143:58419/user/MetricQueryService 
> <http://flink-metrics@172.19.179.143:58419/user/MetricQueryService>
> 2021-12-02 22:06:59.947 [Flink-DispatcherRestEndpoint-thread-4] DEBUG 
> org.apache.flink.runtime.rpc.akka.AkkaRpcService - Try to connect to remote 
> RPC endpoint with address 
> akka.tcp://flink-metrics@172.19.179.143:58419/user/MetricQueryService 
> <http://flink-metrics@172.19.179.143:58419/user/MetricQueryService>. 
> Returning a 
> org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway 
> gateway.
> 2021-12-02 22:06:59.951 [Flink-DispatcherRestEndpoint-thread-3] DEBUG 
> org.apache.flink.runtime.rpc.akka.AkkaRpcService - Try to connect to remote 
> RPC endpoint with address 
> akka.tcp://flink-metrics@172.19.179.143:58419/user/MetricQueryService 
> <http://flink-metrics@172.19.179.143:58419/user/MetricQueryService>. 
> Returning a 
> org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway 
> gateway.
> 2021-12-02 22:06:59.953 [Flink-DispatcherRestEndpoint-thread-2] DEBUG 
> o.a.f.r.r.handler.legacy.metrics.MetricFetcherImpl - Query metrics for 
> akka.tcp://flink-metrics@172.19.179.143:58419/user/MetricQueryService 
> <http://flink-metrics@172.19.179.143:58419/user/MetricQueryService>.
> 2021-12-02 22:06:59.953 [Flink-DispatcherRestEndpoint-thread-4] DEBUG 
> o.a.f.r.r.handler.legacy.metrics.MetricFetcherImpl - Query metrics for 
> akka.tcp://flink-metrics@172.19.179.143:58419/user/MetricQueryService 
> <http://flink-metrics@172.19.179.143:58419/user/MetricQueryService>.
> 2021-12-02 22:07:00.113 [flink-rest-server-netty-worker-thread-14] DEBUG 
> o.a.f.r.r.h.legacy.files.StaticFileServerHandler - Responding 'NOT MODIFIED' 
> for file 
> '/private/var/folders/n4/sj54y_5d1cj1wd0h_kgz1z4r0000gn/T/flink-web-ui/index.html'
> 2021-12-02 22:07:04.145 [flink-akka.actor.default-dispatcher-60] DEBUG 
> o.a.f.r.resourcemanager.StandaloneResourceManager - Trigger heartbeat request.
> 2021-12-02 22:07:04.149 [flink-akka.actor.default-dispatcher-68] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat 
> request from 2859ebe1eeb6acc39673028ca5f90b2b.
> 2021-12-02 22:07:04.151 [flink-akka.actor.default-dispatcher-60] DEBUG 
> o.a.f.r.resourcemanager.StandaloneResourceManager - Received heartbeat from 
> b2cbed7e-3290-4bd3-b24e-2ad1cf07284d.
> 2021-12-02 22:07:04.151 [flink-akka.actor.default-dispatcher-60] DEBUG 
> o.a.f.r.r.slotmanager.SlotManagerImpl - Received slot report from instance 
> 6b5347bb15142ab9c92fb761ff8d9315: 
> SlotReport{slotsStatus=[SlotStatus{slotID=b2cbed7e-3290-4bd3-b24e-2ad1cf07284d_0,
>  resourceProfile=ResourceProfile{managedMemory=128.000mb (134217728 bytes), 
> networkMemory=64.000mb (67108864 bytes)}, 
> allocationID=2a311c55f742a7a398260555f9d5c775, 
> jobID=d5a71c0aca220cf880f24bbac4fc7c8f}]}.
> 2021-12-02 22:07:04.225 [flink-akka.actor.default-dispatcher-60] DEBUG 
> o.a.f.r.resourcemanager.StandaloneResourceManager - Trigger heartbeat request.
> 2021-12-02 22:07:04.225 [flink-akka.actor.default-dispatcher-60] DEBUG 
> org.apache.flink.runtime.jobmaster.JobMaster - Received heartbeat request 
> from 2859ebe1eeb6acc39673028ca5f90b2b.
> 2021-12-02 22:07:04.225 [flink-akka.actor.default-dispatcher-68] DEBUG 
> o.a.f.r.resourcemanager.StandaloneResourceManager - Received heartbeat from 
> 1d01cc7299b415750d4e3ca23134e760.
> 2021-12-02 22:07:04.734 [flink-akka.actor.default-dispatcher-60] DEBUG 
> org.apache.flink.runtime.jobmaster.JobMaster - Trigger heartbeat request.
> 2021-12-02 22:07:04.735 [flink-akka.actor.default-dispatcher-68] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat 
> request from 1d01cc7299b415750d4e3ca23134e760.
> 2021-12-02 22:07:04.738 [flink-akka.actor.default-dispatcher-68] DEBUG 
> org.apache.flink.runtime.jobmaster.JobMaster - Received heartbeat from 
> b2cbed7e-3290-4bd3-b24e-2ad1cf07284d.
> 2021-12-02 22:07:13.386 [flink-akka.actor.default-dispatcher-67] DEBUG 
> o.a.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Slot Pool Status:
>         status: connected to akka://flink/user/resourcemanager
>         registered TaskManagers: [b2cbed7e-3290-4bd3-b24e-2ad1cf07284d]
>         available slots: []
>         allocated slots: [[AllocatedSlot 2a311c55f742a7a398260555f9d5c775 @ 
> b2cbed7e-3290-4bd3-b24e-2ad1cf07284d @ localhost (dataPort=-1) - 0]]
>         pending requests: []
> 
> 
> 可以看到,正常情况下,应该是这样的, 而hang死的时候,就打了2021-12-02 22:06:55.628 
> [flink-akka.actor.default-dispatcher-62] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Trigger checkpoint 
> 20@1638454015627 for e2fa126a099ade3e581253235ffdc797. 这个日志后,就再也没有其他日志了. 
> <image.png>
> 
> 
> 
> 目前无法百分百复现,另外就是checkpoint那块的代码因为使用了akka的原因,非常难跟踪调试,给寻找问题带来了不小的困难,谁对这块比较熟,帮忙分析分析,到底程序卡死在什么地方了?
>  或者你们遇到过相同的问题吗?
> <jstack>

Reply via email to