[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...
Github user ddolzan closed the pull request at: https://github.com/apache/flink/pull/2790 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2790#discussion_r89341178 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -244,12 +260,7 @@ public void close() { } if (hasFailure.get()) { - Throwable cause = failureThrowable.get(); - if (cause != null) { - throw new RuntimeException("An error occured in ElasticsearchSink.", cause); - } else { - throw new RuntimeException("An error occured in ElasticsearchSink."); - } + LOG.error("Some documents failed while indexing to Elasticsearch: " + failureThrowable.get()); --- End diff -- I would suggest to add a debug log statement as well logging the full stack trace. Also, in the other connectors we have a flag that allows the user to control whether an error should be logged or fail the connector. I would suggest to add this here as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/2790#discussion_r88567247 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java --- @@ -210,6 +216,122 @@ public void cancel() { running = false; } } + + @Test + public void testTemplateCreation() throws Exception { + // Settings.Builder settings=Settings.settingsBuilder().put("",""); + + Mapconfig = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they + // would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-transport-client-cluster"); --- End diff -- Try to see if "cluster.name" can be replaced by ClusterName.CLUSTER_NAME_SETTING --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/2790#discussion_r88566350 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/helper/ElasticSearchHelper.java --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2.helper; + +import com.google.common.collect.ImmutableList; + +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.IndexNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * + * This class manages the creation of index templates and index mapping on elasticsearch. + * + * + * Example: + * + * {@code + * ElasticSearchHelper esHelper = new ElasticSearchHelper(config, transports); + * + * //Create an Index Template given a name and the json structure + * esHelper.initTemplate(templateName, templateRequest); + * + * //Create an Index Mapping given the Index Name, DocType and the json structure + * esHelper.initIndexMapping(indexName, docType, mappingsRequest); + * + * } + * + * + * The {@link Map} passed to the constructor is forwarded to Elasticsearch when + * creating {@link TransportClient}. The config keys can be found in the + * Elasticsearch documentation. An important setting is {@code cluster.name}, + * this should be set to the name of the cluster that the sink should emit to. + * + */ +public class ElasticSearchHelper { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); + + private Client client; + + private final static int DEFAULT_INDEX_SHARDS = 2; + private final static int DEFAULT_INDEX_REPLICAS = 0; + + /** +* Creates a new ElasticSearchHelper that connects to the cluster using a TransportClient. +* +* @param userConfig The map of user settings that are passed when constructing the TransportClients +* @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient} +*/ + public ElasticSearchHelper(MapuserConfig, List transportAddresses) { + client = buildElasticsearchClient(userConfig, transportAddresses); + } + + /** +* Build a TransportClient to connect to the cluster. +* +* @param userConfig The map of user settings that are passed when constructing the TransportClients +* @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient} +* @return Initialized TransportClient +*/ + public static Client buildElasticsearchClient(Map userConfig, + List transportAddresses) { + List transportNodes; + transportNodes = new ArrayList<>(transportAddresses.size()); + for (InetSocketAddress address : transportAddresses) { + transportNodes.add(new
[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/2790#discussion_r88566247 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/helper/ElasticSearchHelper.java --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2.helper; + +import com.google.common.collect.ImmutableList; + +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.IndexNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * + * This class manages the creation of index templates and index mapping on elasticsearch. + * + * + * Example: + * + * {@code + * ElasticSearchHelper esHelper = new ElasticSearchHelper(config, transports); + * + * //Create an Index Template given a name and the json structure + * esHelper.initTemplate(templateName, templateRequest); + * + * //Create an Index Mapping given the Index Name, DocType and the json structure + * esHelper.initIndexMapping(indexName, docType, mappingsRequest); + * + * } + * + * + * The {@link Map} passed to the constructor is forwarded to Elasticsearch when + * creating {@link TransportClient}. The config keys can be found in the + * Elasticsearch documentation. An important setting is {@code cluster.name}, + * this should be set to the name of the cluster that the sink should emit to. + * + */ +public class ElasticSearchHelper { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); + + private Client client; + + private final static int DEFAULT_INDEX_SHARDS = 2; + private final static int DEFAULT_INDEX_REPLICAS = 0; + + /** +* Creates a new ElasticSearchHelper that connects to the cluster using a TransportClient. +* +* @param userConfig The map of user settings that are passed when constructing the TransportClients +* @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient} +*/ + public ElasticSearchHelper(MapuserConfig, List transportAddresses) { + client = buildElasticsearchClient(userConfig, transportAddresses); + } + + /** +* Build a TransportClient to connect to the cluster. +* +* @param userConfig The map of user settings that are passed when constructing the TransportClients +* @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient} +* @return Initialized TransportClient +*/ + public static Client buildElasticsearchClient(Map userConfig, + List transportAddresses) { + List transportNodes; + transportNodes = new ArrayList<>(transportAddresses.size()); + for (InetSocketAddress address : transportAddresses) { + transportNodes.add(new
[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2790#discussion_r87804804 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/helper/ElasticSearchHelper.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2.helper; + +import com.google.common.collect.ImmutableList; + +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.IndexNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class ElasticSearchHelper { --- End diff -- This class needs a javadoc that explains what this it can be used for. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...
GitHub user ddolzan opened a pull request: https://github.com/apache/flink/pull/2790 [FLINK-4491] Handle index.number_of_shards in the ES connector Implemented the Index Template and Index Mapping creation. Number of shards and many other properties can be defined in the Index Template. ### Usage Before calling ElasticasearchSink instantiate ElasticSearchHelper ```java ElasticSearchHelper esHelper = new ElasticSearchHelper(config, transports); //Create an Index Template given a name and the json structure esHelper.initTemplate(templateName, templateRequest); //Create an Index Mapping given the Index Name, DocType and the json structure esHelper.initIndexMapping(indexName, docType, mappingsRequest); ``` ### TemplateRequest example ```json { "template": "te*", "settings": { "number_of_shards": 1 }, "mappings": { "type1": { "_source": { "enabled": false }, "properties": { "host_name": { "type": "keyword" }, "created_at": { "type": "date", "format": "EEE MMM dd HH:mm:ss Z " } } } } } ``` ### MappingRequest example ```json { "mappings": { "user": { "_all": { "enabled": false }, "properties": { "title": { "type": "string" }, "name": { "type": "string" }, "age": { "type": "integer" } } } } } ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/ddolzan/flink issue-4491 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2790.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2790 commit bae4237b755c1522a026ec691388ec8c197e8ab8 Author: ddolzanDate: 2016-11-11T15:21:05Z [FLINK-4491] Added index template and mappings creation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---