hi, es index 的 mapping 是否提前设置好了?
我看到异常 : > failed to process cluster event (put-mapping) within 30s 像是自动建 mapping 超时了 Leonard Xu <xbjt...@gmail.com> 于2020年4月22日周三 下午4:41写道: > Hi, > > 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的? > 可以从这方面找思路排查下看看 > > 祝好, > Leonard Xu > > > > > 在 2020年4月22日,16:10,Oliver <zzh...@foxmail.com> 写道: > > > > hi, > > > > > > 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程, > > > 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。 > > > > > > 想咨询下这种问题应该怎么样排查和处理? > > > > > > flink版本:1.10 > > ES版本:6.x > > > > > > 使用jar:flink-sql-connector-elasticsearch6_2.12 > > > > > > 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀 > > 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建 > > > > > > ES异常如下: > > > > > > 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed > Elasticsearch item request: ElasticsearchException[Elasticsearch exception > [type=process_cluster_event_timeout_exception, reason=failed to process > cluster event (put-mapping) within 30s]]org.apache.flink. > elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: > Elasticsearch exception [type=process_cluster_event_timeout_exception, > reason=failed to process cluster event (put-mapping) within 30s] > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$1.completed(RestClient.java:375) > > at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$1.completed(RestClient.java:366) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > > at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) > > at java.lang.Thread.run(Thread.java:748) > > > > > > > > flinkSQL: > > CREATE TABLE source_table ( > > `time` VARCHAR > > ,`level` VARCHAR > > ,`thread` VARCHAR > > ,`class` VARCHAR > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.topic' = 'xxxx', > > 'connector.startup-mode' = 'latest-offset', > > 'connector.properties.group.id' = 'xxxx', > > 'connector.properties.zookeeper.connect' = 'ip:2181', > > 'connector.properties.bootstrap.servers' = 'ip:9092', > > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ); > > > > > > CREATE TABLE result_table ( > > `time` VARCHAR > > ,`level` VARCHAR > > ,`thread` VARCHAR > > ,`class` VARCHAR > > ) WITH ( > > 'connector.type' = 'elasticsearch', > > 'connector.version' = '6', > > 'connector.hosts' = 'xxxx, > > 'connector.index' = 'xxxx-yyyy.MM.dd', > > 'connector.document-type' = 'doc', > > 'update-mode' = 'append', > > 'connector.bulk-flush.interval' = '1000', > > 'connector.bulk-flush.backoff.type' = 'exponential', > > 'connector.bulk-flush.backoff.max-retries' = '10', > > 'connector.bulk-flush.backoff.delay' = '60000', > > 'connector.failure-handler' = 'ignore', > > 'format.type' = 'json' > > ); > > > > > > INSERT INTO result_table > > SELECT > > `time`,`level`,thread,class > > FROM source_table > > WHERE `method`='xxxx'; > >