Mapping??????template????????????
"xxx-2020.04.23": {
"mappings": {
"doc": {
"dynamic_templates": [
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"properties": {
"cost": {
"type": "long"
},
"result": {
"type": "keyword"
}
}
}
}
}
setting??????
"xxx-2020.04.23": {
"settings": {
"index": {
"routing": {
"allocation": {
"total_shards_per_node": "1"
}
},
"refresh_interval": "10s",
"number_of_shards": "2",
"provided_name": "xxx-2020.04.23",
"creation_date": "1587509965602", -- 2020/4/22
6:59:25
"number_of_replicas": "0",
"uuid": "f9OqpCmJQnyqlqTeYpt1Sg",
"version": {
"created": "6020499"
}
}
}
}
------------------ ???????? ------------------
??????: "zhisheng"<[email protected]>;
????????: 2020??4??22??(??????) ????4:47
??????: "user-zh"<[email protected]>;
????: Re: ????0??????????Elasticsearch??????kafka????????
hi,
es index ?? mapping ??????????????????
?????????? :
> failed to process cluster event (put-mapping) within 30s
?????????? mapping ??????
Leonard Xu <[email protected]> ??2020??4??22?????? ????4:41??????
> Hi,
>
>
????????????????shard??????????????????????reallocation??relocation????????????
> ????????????????????????????
>
> ??????
> Leonard Xu
>
>
>
> > ?? 2020??4??22????16:10??Oliver <[email protected]> ??????
> >
> > hi??
> >
> >
> > ??????????????????flink??kafka????????ES????ETL??????
> >
>
??????????????????????0??????????????ES????,????????????kafka??????????????????????????kafka????????????????????????????????????????????????????
> >
> >
> > ??????????????????????????????????????
> >
> >
> > flink??????1.10
> > ES??????6.x&nbsp;
> >
> >
> > ????jar??flink-sql-connector-elasticsearch6_2.12
> >
> >
> >
??????????????????00:00-00:01????????????????????????????????????????????????????????????????????????????
> >
????????????????????????????????????????????????0??????????????????????????????????
> >
> >
> > ES??????????
> >
> >
> > 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
> Elasticsearch item request: ElasticsearchException[Elasticsearch exception
> [type=process_cluster_event_timeout_exception, reason=failed to process
> cluster event (put-mapping) within 30s]]org.apache.flink.
> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
> Elasticsearch exception [type=process_cluster_event_timeout_exception,
> reason=failed to process cluster event (put-mapping) within 30s]
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org
>
.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> > &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)
> >
> >
> >
> > flinkSQL:
> > CREATE TABLE source_table (
> > &nbsp; `time` VARCHAR
> > &nbsp; ,`level` VARCHAR
> > &nbsp; ,`thread` VARCHAR
> > &nbsp; ,`class` VARCHAR
> > ) WITH (
> > &nbsp; &nbsp;'connector.type' = 'kafka',
> > &nbsp; &nbsp;'connector.version' = 'universal',
> > &nbsp; &nbsp;'connector.topic' = 'xxxx',
> > &nbsp; &nbsp;'connector.startup-mode' = 'latest-offset',
> > &nbsp; &nbsp;'connector.properties.group.id' = 'xxxx',
> > &nbsp; &nbsp;'connector.properties.zookeeper.connect' =
'ip:2181',
> > &nbsp; &nbsp;'connector.properties.bootstrap.servers' =
'ip:9092',
> > &nbsp; &nbsp;'format.type' = 'json',
> > &nbsp; &nbsp;'format.derive-schema' = 'true'
> > );
> >
> >
> > CREATE TABLE result_table (
> > &nbsp; `time` VARCHAR
> > &nbsp; ,`level` VARCHAR
> > &nbsp; ,`thread` VARCHAR
> > &nbsp; ,`class` VARCHAR
> > ) WITH (
> > &nbsp; 'connector.type' = 'elasticsearch',
> > &nbsp; 'connector.version' = '6',
> > &nbsp; 'connector.hosts' = 'xxxx,
> > &nbsp; 'connector.index' = 'xxxx-yyyy.MM.dd',
> > &nbsp; 'connector.document-type' = 'doc',
> > &nbsp; 'update-mode' = 'append',
> > &nbsp; 'connector.bulk-flush.interval' = '1000',
> > &nbsp; 'connector.bulk-flush.backoff.type' = 'exponential',
> > &nbsp; 'connector.bulk-flush.backoff.max-retries' = '10',
> > &nbsp; 'connector.bulk-flush.backoff.delay' = '60000',
> > &nbsp; 'connector.failure-handler' = 'ignore',
> > &nbsp; 'format.type' = 'json'
> > );
> >
> >
> > INSERT INTO result_table
> > SELECT
> > &nbsp; &nbsp; `time`,`level`,thread,class
> > FROM source_table
> > WHERE `method`='xxxx';
>
>