非常感谢Leonard Xu和zhisheng的回复
> es index 的 mapping 是否提前设置好了?
提前设置好了,提前创建索引的mapping如下:
{
"xxx-2020.04.23": {
"mappings": {
"doc": {
"dynamic_templates": [
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"properties": {
"cost": {
"type": "long"
},
"result": {
"type": "keyword"
}
}
}
}
}
}
而待写入数据的字段远不止cost和result
查看ES官方文档对dynamic_templates的介绍:When putting new dynamic templates through the
put mapping
<https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html>
API, all existing templates are overwritten.[1]
个人猜测是:已经设置的mapping未覆盖全数据字段、写入ES时依旧会调用put mapping API做修改,导致异常
重新调整了新索引的mapping为全字段,failed to process cluster event (put-mapping) within
30s异常消失了
[1]
https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates
<https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates>
Best,
Oliver yunchang
> 2020年4月22日 下午4:47,zhisheng <[email protected]> 写道:
>
> hi,
>
> es index 的 mapping 是否提前设置好了?
>
> 我看到异常 :
>
>> failed to process cluster event (put-mapping) within 30s
>
> 像是自动建 mapping 超时了
>
> Leonard Xu <[email protected]> 于2020年4月22日周三 下午4:41写道:
>
>> Hi,
>>
>> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
>> 可以从这方面找思路排查下看看
>>
>> 祝好,
>> Leonard Xu
>>
>>
>>
>>> 在 2020年4月22日,16:10,Oliver <[email protected]> 写道:
>>>
>>> hi,
>>>
>>>
>>> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
>>>
>> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
>>>
>>>
>>> 想咨询下这种问题应该怎么样排查和处理?
>>>
>>>
>>> flink版本:1.10
>>> ES版本:6.x
>>>
>>>
>>> 使用jar:flink-sql-connector-elasticsearch6_2.12
>>>
>>>
>>> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
>>> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
>>>
>>>
>>> ES异常如下:
>>>
>>>
>>> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
>> Elasticsearch item request: ElasticsearchException[Elasticsearch exception
>> [type=process_cluster_event_timeout_exception, reason=failed to process
>> cluster event (put-mapping) within 30s]]org.apache.flink.
>> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
>> Elasticsearch exception [type=process_cluster_event_timeout_exception,
>> reason=failed to process cluster event (put-mapping) within 30s]
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>> at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>
>>> flinkSQL:
>>> CREATE TABLE source_table (
>>> `time` VARCHAR
>>> ,`level` VARCHAR
>>> ,`thread` VARCHAR
>>> ,`class` VARCHAR
>>> ) WITH (
>>> 'connector.type' = 'kafka',
>>> 'connector.version' = 'universal',
>>> 'connector.topic' = 'xxxx',
>>> 'connector.startup-mode' = 'latest-offset',
>>> 'connector.properties.group.id' = 'xxxx',
>>> 'connector.properties.zookeeper.connect' = 'ip:2181',
>>> 'connector.properties.bootstrap.servers' = 'ip:9092',
>>> 'format.type' = 'json',
>>> 'format.derive-schema' = 'true'
>>> );
>>>
>>>
>>> CREATE TABLE result_table (
>>> `time` VARCHAR
>>> ,`level` VARCHAR
>>> ,`thread` VARCHAR
>>> ,`class` VARCHAR
>>> ) WITH (
>>> 'connector.type' = 'elasticsearch',
>>> 'connector.version' = '6',
>>> 'connector.hosts' = 'xxxx,
>>> 'connector.index' = 'xxxx-yyyy.MM.dd',
>>> 'connector.document-type' = 'doc',
>>> 'update-mode' = 'append',
>>> 'connector.bulk-flush.interval' = '1000',
>>> 'connector.bulk-flush.backoff.type' = 'exponential',
>>> 'connector.bulk-flush.backoff.max-retries' = '10',
>>> 'connector.bulk-flush.backoff.delay' = '60000',
>>> 'connector.failure-handler' = 'ignore',
>>> 'format.type' = 'json'
>>> );
>>>
>>>
>>> INSERT INTO result_table
>>> SELECT
>>> `time`,`level`,thread,class
>>> FROM source_table
>>> WHERE `method`='xxxx';
>>
>>