Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积

2020-04-23 文章 zhisheng
👍👍👍

oliver yunchang  于2020年4月23日周四 上午12:32写道:

> 非常感谢Leonard Xu和zhisheng的回复
>
> > es index 的 mapping 是否提前设置好了?
> 提前设置好了,提前创建索引的mapping如下:
>   {
>   "xxx-2020.04.23": {
> "mappings": {
>   "doc": {
> "dynamic_templates": [
>   {
> "string_fields": {
>   "match": "*",
>   "match_mapping_type": "string",
>   "mapping": {
> "type": "keyword"
>   }
> }
>   }
> ],
> "properties": {
>   "cost": {
> "type": "long"
>   },
>   "result": {
> "type": "keyword"
>   }
> }
>   }
> }
>   }
> }
> 而待写入数据的字段远不止cost和result
> 查看ES官方文档对dynamic_templates的介绍:When putting new dynamic templates through
> the put mapping <
> https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html>
> API, all existing templates are overwritten.[1]
> 个人猜测是:已经设置的mapping未覆盖全数据字段、写入ES时依旧会调用put mapping API做修改,导致异常
>
> 重新调整了新索引的mapping为全字段,failed to process cluster event (put-mapping) within
> 30s异常消失了
>
> [1]
> https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates
> <
> https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates
> >
> Best,
> Oliver yunchang
>
> > 2020年4月22日 下午4:47,zhisheng  写道:
> >
> > hi,
> >
> > es index 的 mapping 是否提前设置好了?
> >
> > 我看到异常 :
> >
> >> failed to process cluster event (put-mapping) within 30s
> >
> > 像是自动建 mapping 超时了
> >
> > Leonard Xu  于2020年4月22日周三 下午4:41写道:
> >
> >> Hi,
> >>
> >> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
> >> 可以从这方面找思路排查下看看
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >>
> >>
> >>> 在 2020年4月22日,16:10,Oliver  写道:
> >>>
> >>> hi,
> >>>
> >>>
> >>> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
> >>>
> >>
> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
> >>>
> >>>
> >>> 想咨询下这种问题应该怎么样排查和处理?
> >>>
> >>>
> >>> flink版本:1.10
> >>> ES版本:6.x 
> >>>
> >>>
> >>> 使用jar:flink-sql-connector-elasticsearch6_2.12
> >>>
> >>>
> >>> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
> >>> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
> >>>
> >>>
> >>> ES异常如下:
> >>>
> >>>
> >>> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
> >> Elasticsearch item request: ElasticsearchException[Elasticsearch
> exception
> >> [type=process_cluster_event_timeout_exception, reason=failed to process
> >> cluster event (put-mapping) within 30s]]org.apache.flink.
> >> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
> >> Elasticsearch exception [type=process_cluster_event_timeout_exception,
> >> reason=failed to process cluster event (put-mapping) within 30s]
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >> .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >> .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
> >>>     at org.apache.flink.elasticsearch6.shaded.org
> >>
> .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(Def

Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积

2020-04-22 文章 oliver yunchang
非常感谢Leonard Xu和zhisheng的回复

> es index 的 mapping 是否提前设置好了?
提前设置好了,提前创建索引的mapping如下:
  {
  "xxx-2020.04.23": {
"mappings": {
  "doc": {
"dynamic_templates": [
  {
"string_fields": {
  "match": "*",
  "match_mapping_type": "string",
  "mapping": {
"type": "keyword"
  }
}
  }
],
"properties": {
  "cost": {
"type": "long"
  },
  "result": {
"type": "keyword"
  }
}
  }
}
  }
}
而待写入数据的字段远不止cost和result
查看ES官方文档对dynamic_templates的介绍:When putting new dynamic templates through the 
put mapping 

 API, all existing templates are overwritten.[1]
个人猜测是:已经设置的mapping未覆盖全数据字段、写入ES时依旧会调用put mapping API做修改,导致异常

重新调整了新索引的mapping为全字段,failed to process cluster event (put-mapping) within 
30s异常消失了

[1] 
https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates
 

Best,
Oliver yunchang

> 2020年4月22日 下午4:47,zhisheng  写道:
> 
> hi,
> 
> es index 的 mapping 是否提前设置好了?
> 
> 我看到异常 :
> 
>> failed to process cluster event (put-mapping) within 30s
> 
> 像是自动建 mapping 超时了
> 
> Leonard Xu  于2020年4月22日周三 下午4:41写道:
> 
>> Hi,
>> 
>> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
>> 可以从这方面找思路排查下看看
>> 
>> 祝好,
>> Leonard Xu
>> 
>> 
>> 
>>> 在 2020年4月22日,16:10,Oliver  写道:
>>> 
>>> hi,
>>> 
>>> 
>>> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
>>> 
>> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
>>> 
>>> 
>>> 想咨询下这种问题应该怎么样排查和处理?
>>> 
>>> 
>>> flink版本:1.10
>>> ES版本:6.x 
>>> 
>>> 
>>> 使用jar:flink-sql-connector-elasticsearch6_2.12
>>> 
>>> 
>>> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
>>> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
>>> 
>>> 
>>> ES异常如下:
>>> 
>>> 
>>> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
>> Elasticsearch item request: ElasticsearchException[Elasticsearch exception
>> [type=process_cluster_event_timeout_exception, reason=failed to process
>> cluster event (put-mapping) within 30s]]org.apache.flink.
>> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
>> Elasticsearch exception [type=process_cluster_event_timeout_exception,
>> reason=failed to process cluster event (put-mapping) within 30s]
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
>>>     at org.apache.flink.elasticsearch6.shaded.org
>> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODisp

Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积

2020-04-22 文章 zhisheng
hi,

es index 的 mapping 是否提前设置好了?

我看到异常 :

> failed to process cluster event (put-mapping) within 30s

像是自动建 mapping 超时了

Leonard Xu  于2020年4月22日周三 下午4:41写道:

> Hi,
>
> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
> 可以从这方面找思路排查下看看
>
> 祝好,
> Leonard Xu
>
>
>
> > 在 2020年4月22日,16:10,Oliver  写道:
> >
> > hi,
> >
> >
> > 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
> >
> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
> >
> >
> > 想咨询下这种问题应该怎么样排查和处理?
> >
> >
> > flink版本:1.10
> > ES版本:6.x 
> >
> >
> > 使用jar:flink-sql-connector-elasticsearch6_2.12
> >
> >
> > 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
> > 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
> >
> >
> > ES异常如下:
> >
> >
> > 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
> Elasticsearch item request: ElasticsearchException[Elasticsearch exception
> [type=process_cluster_event_timeout_exception, reason=failed to process
> cluster event (put-mapping) within 30s]]org.apache.flink.
> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
> Elasticsearch exception [type=process_cluster_event_timeout_exception,
> reason=failed to process cluster event (put-mapping) within 30s]
> >     at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> >     at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> >     at java.lang.Thread.run(Thread.java:748)
> >
> >
> >
> > flinkSQL:
> > CREATE TABLE source_table (
> >   `time` VARCHAR
> >   ,`level` VARCHAR
> >   ,`thread` VARCHAR
> >   ,`class` VARCHAR
> > ) WITH (
> >    'connector.type' = 'kafka',
> >    'connector.version' = 'universal',
> >    'connector.topic' = '',

Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积

2020-04-22 文章 Leonard Xu
Hi,
  
提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
可以从这方面找思路排查下看看

祝好,
Leonard Xu



> 在 2020年4月22日,16:10,Oliver  写道:
> 
> hi,
> 
> 
> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
> 
> 
> 想咨询下这种问题应该怎么样排查和处理?
> 
> 
> flink版本:1.10
> ES版本:6.x 
> 
> 
> 使用jar:flink-sql-connector-elasticsearch6_2.12
> 
> 
> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
> 
> 
> ES异常如下:
> 
> 
> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed Elasticsearch 
> item request: ElasticsearchException[Elasticsearch exception 
> [type=process_cluster_event_timeout_exception, reason=failed to process 
> cluster event (put-mapping) within 
> 30s]]org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
>  Elasticsearch exception [type=process_cluster_event_timeout_exception, 
> reason=failed to process cluster event (put-mapping) within 30s]
>     at 
> org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>     at 
> org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
>     at java.lang.Thread.run(Thread.java:748)
> 
> 
> 
> flinkSQL:
> CREATE TABLE source_table (
>   `time` VARCHAR
>   ,`level` VARCHAR
>   ,`thread` VARCHAR
>   ,`class` VARCHAR
> ) WITH (
>    'connector.type' = 'kafka',
>    'connector.version' = 'universal',
>    'connector.topic' = '',
>    'connector.startup-mode' = 'latest-offset',
>    'connector.properties.group.id' = '',
>    'connector.properties.zookeeper.connect' = 'ip:2181',
>    'connector.properties.bootstrap.servers' = 'ip:9092',
>    'format.type' = 'json',
>    'format.derive-schema' = 'true'
>