[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...

2016-12-01 Thread ddolzan
Github user ddolzan closed the pull request at:

https://github.com/apache/flink/pull/2790


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2790#discussion_r89341178
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -244,12 +260,7 @@ public void close() {
}
 
if (hasFailure.get()) {
-   Throwable cause = failureThrowable.get();
-   if (cause != null) {
-   throw new RuntimeException("An error occured in 
ElasticsearchSink.", cause);
-   } else {
-   throw new RuntimeException("An error occured in 
ElasticsearchSink.");
-   }
+   LOG.error("Some documents failed while indexing to 
Elasticsearch: " + failureThrowable.get());
--- End diff --

I would suggest to add a debug log statement as well logging the full stack 
trace.
Also, in the other connectors we have a flag that allows the user to 
control whether an error should be logged or fail the connector. I would 
suggest to add this here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...

2016-11-17 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/2790#discussion_r88567247
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 ---
@@ -210,6 +216,122 @@ public void cancel() {
running = false;
}
}
+   
+   @Test
+   public void testTemplateCreation() throws Exception {
+   // Settings.Builder 
settings=Settings.settingsBuilder().put("","");
+
+   Map config = new HashMap<>();
+   // This instructs the sink to emit after every element, 
otherwise they
+   // would be buffered
+   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
+   config.put("cluster.name", "my-transport-client-cluster");
--- End diff --

Try to see if "cluster.name" can be replaced by 
ClusterName.CLUSTER_NAME_SETTING


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...

2016-11-17 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/2790#discussion_r88566350
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/helper/ElasticSearchHelper.java
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for 
additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 
ANY KIND, either express
+ * or implied. See the License for the specific language governing 
permissions and limitations under
+ * the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2.helper;
+
+import com.google.common.collect.ImmutableList;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+import 
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 
+ * This class manages the creation of index templates and index mapping on 
elasticsearch.
+ * 
+ * 
+ * Example:
+ *
+ * {@code
+ * ElasticSearchHelper esHelper = new 
ElasticSearchHelper(config, transports);
+ *
+ * //Create an Index Template given a name and the 
json structure
+ * esHelper.initTemplate(templateName, 
templateRequest);
+ * 
+ * //Create an Index Mapping given the 
Index Name, DocType and the json structure
+ * esHelper.initIndexMapping(indexName, 
docType, mappingsRequest);
+ *
+ * }
+ * 
+ * 
+ * The {@link Map} passed to the constructor is forwarded to Elasticsearch 
when
+ * creating {@link TransportClient}. The config keys can be found in the
+ * Elasticsearch documentation. An important setting is {@code 
cluster.name},
+ * this should be set to the name of the cluster that the sink should emit 
to.
+ *
+ */
+public class ElasticSearchHelper {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSink.class);
+
+   private Client client;
+   
+   private final static int DEFAULT_INDEX_SHARDS = 2;
+   private final static int DEFAULT_INDEX_REPLICAS = 0;
+
+   /**
+* Creates a new ElasticSearchHelper that connects to the cluster using 
a TransportClient.
+*
+* @param userConfig The map of user settings that are passed when 
constructing the TransportClients
+* @param transportAddresses The Elasticsearch Nodes to which to 
connect using a {@code TransportClient}
+*/
+   public ElasticSearchHelper(Map userConfig, 
List transportAddresses) {
+   client = buildElasticsearchClient(userConfig, 
transportAddresses);
+   }
+
+   /**
+* Build a TransportClient to connect to the cluster.
+* 
+* @param userConfig The map of user settings that are passed when 
constructing the TransportClients
+* @param transportAddresses The Elasticsearch Nodes to which to 
connect using a {@code TransportClient}
+* @return Initialized TransportClient
+*/
+   public static Client buildElasticsearchClient(Map 
userConfig,
+   List transportAddresses) {
+   List transportNodes;
+   transportNodes = new ArrayList<>(transportAddresses.size());
+   for (InetSocketAddress address : transportAddresses) {
+   transportNodes.add(new 

[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...

2016-11-17 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/2790#discussion_r88566247
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/helper/ElasticSearchHelper.java
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for 
additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 
ANY KIND, either express
+ * or implied. See the License for the specific language governing 
permissions and limitations under
+ * the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2.helper;
+
+import com.google.common.collect.ImmutableList;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+import 
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 
+ * This class manages the creation of index templates and index mapping on 
elasticsearch.
+ * 
+ * 
+ * Example:
+ *
+ * {@code
+ * ElasticSearchHelper esHelper = new 
ElasticSearchHelper(config, transports);
+ *
+ * //Create an Index Template given a name and the 
json structure
+ * esHelper.initTemplate(templateName, 
templateRequest);
+ * 
+ * //Create an Index Mapping given the 
Index Name, DocType and the json structure
+ * esHelper.initIndexMapping(indexName, 
docType, mappingsRequest);
+ *
+ * }
+ * 
+ * 
+ * The {@link Map} passed to the constructor is forwarded to Elasticsearch 
when
+ * creating {@link TransportClient}. The config keys can be found in the
+ * Elasticsearch documentation. An important setting is {@code 
cluster.name},
+ * this should be set to the name of the cluster that the sink should emit 
to.
+ *
+ */
+public class ElasticSearchHelper {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSink.class);
+
+   private Client client;
+   
+   private final static int DEFAULT_INDEX_SHARDS = 2;
+   private final static int DEFAULT_INDEX_REPLICAS = 0;
+
+   /**
+* Creates a new ElasticSearchHelper that connects to the cluster using 
a TransportClient.
+*
+* @param userConfig The map of user settings that are passed when 
constructing the TransportClients
+* @param transportAddresses The Elasticsearch Nodes to which to 
connect using a {@code TransportClient}
+*/
+   public ElasticSearchHelper(Map userConfig, 
List transportAddresses) {
+   client = buildElasticsearchClient(userConfig, 
transportAddresses);
+   }
+
+   /**
+* Build a TransportClient to connect to the cluster.
+* 
+* @param userConfig The map of user settings that are passed when 
constructing the TransportClients
+* @param transportAddresses The Elasticsearch Nodes to which to 
connect using a {@code TransportClient}
+* @return Initialized TransportClient
+*/
+   public static Client buildElasticsearchClient(Map 
userConfig,
+   List transportAddresses) {
+   List transportNodes;
+   transportNodes = new ArrayList<>(transportAddresses.size());
+   for (InetSocketAddress address : transportAddresses) {
+   transportNodes.add(new 

[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...

2016-11-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2790#discussion_r87804804
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/helper/ElasticSearchHelper.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for 
additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 
ANY KIND, either express
+ * or implied. See the License for the specific language governing 
permissions and limitations under
+ * the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch2.helper;
+
+import com.google.common.collect.ImmutableList;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticSearchHelper {
--- End diff --

This class needs a javadoc that explains what this it can be used for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2790: [FLINK-4491] Handle index.number_of_shards in the ...

2016-11-11 Thread ddolzan
GitHub user ddolzan opened a pull request:

https://github.com/apache/flink/pull/2790

[FLINK-4491] Handle index.number_of_shards in the ES connector

Implemented the Index Template and Index Mapping creation. 
Number of shards and many other properties can be defined in the Index 
Template.

### Usage
Before calling ElasticasearchSink instantiate ElasticSearchHelper

```java
ElasticSearchHelper esHelper = new ElasticSearchHelper(config, transports);
//Create an Index Template given a name and the json structure
esHelper.initTemplate(templateName, templateRequest);
//Create an Index Mapping given the Index Name, DocType and the json 
structure
esHelper.initIndexMapping(indexName, docType, mappingsRequest);
```

### TemplateRequest example
```json
{
  "template": "te*",
  "settings": {
"number_of_shards": 1
  },
  "mappings": {
"type1": {
  "_source": {
"enabled": false
  },
  "properties": {
"host_name": {
  "type": "keyword"
},
"created_at": {
  "type": "date",
  "format": "EEE MMM dd HH:mm:ss Z "
}
  }
}
  }
}
```
### MappingRequest example
```json
{
  "mappings": {
"user": {
  "_all": {
"enabled": false
  },
  "properties": {
"title": {
  "type": "string"
},
"name": {
  "type": "string"
},
"age": {
  "type": "integer"
}
  }
}
  }
}
```





You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ddolzan/flink issue-4491

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2790.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2790


commit bae4237b755c1522a026ec691388ec8c197e8ab8
Author: ddolzan 
Date:   2016-11-11T15:21:05Z

[FLINK-4491] Added index template and mappings creation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---