hi??
??????????????????flink??kafka????????ES????ETL?????? ??????????????????????0??????????????ES????,????????????kafka??????????????????????????kafka???????????????????????????????????????????????????? ?????????????????????????????????????? flink??????1.10 ES??????6.x ????jar??flink-sql-connector-elasticsearch6_2.12 ??????????????????00:00-00:01???????????????????????????????????????????????????????????????????????????? ????????????????????????????????????????????????0?????????????????????????????????? ES?????????? 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed Elasticsearch item request: ElasticsearchException[Elasticsearch exception [type=process_cluster_event_timeout_exception, reason=failed to process cluster event (put-mapping) within 30s]]org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=process_cluster_event_timeout_exception, reason=failed to process cluster event (put-mapping) within 30s] at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:375) at org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:366) at org.apache.flink.elasticsearch6.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) at org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) at org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:748) flinkSQL: CREATE TABLE source_table ( `time` VARCHAR ,`level` VARCHAR ,`thread` VARCHAR ,`class` VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'xxxx', 'connector.startup-mode' = 'latest-offset', 'connector.properties.group.id' = 'xxxx', 'connector.properties.zookeeper.connect' = 'ip:2181', 'connector.properties.bootstrap.servers' = 'ip:9092', 'format.type' = 'json', 'format.derive-schema' = 'true' ); CREATE TABLE result_table ( `time` VARCHAR ,`level` VARCHAR ,`thread` VARCHAR ,`class` VARCHAR ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'xxxx, 'connector.index' = 'xxxx-yyyy.MM.dd', 'connector.document-type' = 'doc', 'update-mode' = 'append', 'connector.bulk-flush.interval' = '1000', 'connector.bulk-flush.backoff.type' = 'exponential', 'connector.bulk-flush.backoff.max-retries' = '10', 'connector.bulk-flush.backoff.delay' = '60000', 'connector.failure-handler' = 'ignore', 'format.type' = 'json' ); INSERT INTO result_table SELECT `time`,`level`,thread,class FROM source_table WHERE `method`='xxxx';
