METAMODEL-1099: Implemented DataContextFactory SPI for ElasticSearch Closes #118
Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/f35bfed0 Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/f35bfed0 Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/f35bfed0 Branch: refs/heads/5.x Commit: f35bfed081f4c54226e8b0d60a8c161e0c35dd6a Parents: ae5ec80 Author: kaspersorensen <i.am.kasper.soren...@gmail.com> Authored: Fri Jul 29 08:38:49 2016 -0700 Committer: kaspersorensen <i.am.kasper.soren...@gmail.com> Committed: Fri Jul 29 08:38:49 2016 -0700 ---------------------------------------------------------------------- CHANGES.md | 1 + .../metamodel/factory/DataContextFactory.java | 16 ++ .../ElasticSearchDataContextFactory.java | 165 +++++++++++++++++++ ....apache.metamodel.factory.DataContextFactory | 1 + .../rest/ElasticSearchRestDataContext.java | 61 ++++--- .../ElasticSearchRestDataContextFactory.java | 106 ++++++++++++ ....apache.metamodel.factory.DataContextFactory | 1 + 7 files changed, 324 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/f35bfed0/CHANGES.md ---------------------------------------------------------------------- diff --git a/CHANGES.md b/CHANGES.md index 4f61177..bd2cec8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ ### Apache MetaModel 4.5.4 (work in progress) * [METAMODEL-1099] - Created a new DataContextFactory SPI and a extensible registry of implementations based on ServiceLoader. + * [METAMODEL-1099] - Implemented DataContextFactory SPI for connectors: JDBC, CSV, ElasticSearch * [METAMODEL-1103] - Fixed a bug pertaining to anchoring of wildcards in LIKE operands. * [METAMODEL-1088] - Add support for aliases in MongoDB. * [METAMODEL-1086] - Fixed encoding issue when CsvDataContext is instantiated with InputStream. http://git-wip-us.apache.org/repos/asf/metamodel/blob/f35bfed0/core/src/main/java/org/apache/metamodel/factory/DataContextFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/metamodel/factory/DataContextFactory.java b/core/src/main/java/org/apache/metamodel/factory/DataContextFactory.java index b9f8e3e..1a00fa8 100644 --- a/core/src/main/java/org/apache/metamodel/factory/DataContextFactory.java +++ b/core/src/main/java/org/apache/metamodel/factory/DataContextFactory.java @@ -18,9 +18,25 @@ */ package org.apache.metamodel.factory; +import java.util.ServiceLoader; + import org.apache.metamodel.ConnectionException; import org.apache.metamodel.DataContext; +/** + * Represents a factory of {@link DataContext} objects. Factories take + * {@link DataContextProperties} and turn them into active {@link DataContext} + * instances. + * + * Multiple factories can exist in order to serve different kinds of properties, + * thereby offering a dynamic factory mechanism. The collection of factories is + * accessible via {@link DataContextFactoryRegistry}. + * + * These factories are registered via the Java {@link ServiceLoader} SPI API. So + * add a file with path + * "/META-INF/services/org.apache.metamodel.factory.DataContextFactory" in any + * JAR file in order to register another factory. + */ public interface DataContextFactory { public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry); http://git-wip-us.apache.org/repos/asf/metamodel/blob/f35bfed0/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java new file mode 100644 index 0000000..94359c4 --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import org.apache.metamodel.ConnectionException; +import org.apache.metamodel.DataContext; +import org.apache.metamodel.factory.DataContextFactory; +import org.apache.metamodel.factory.DataContextProperties; +import org.apache.metamodel.factory.ResourceFactoryRegistry; +import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException; +import org.apache.metamodel.util.SimpleTableDef; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.ImmutableSettings.Builder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; + +/** + * Factory for ElasticSearch data context of native type. + * + * The factory will activate when DataContext type is specified as + * "elasticsearch", "es-node", "elasticsearch-node", "es-transport", + * "elasticsearch-transport". + * + * This factory is configured with the following properties: + * + * <ul> + * <li>clientType (needed if datacontext type is just "elasticsearch" - must be + * either "transport" or "node")</li> + * <li>hostname (if clientType is "transport")</li> + * <li>port (if clientType is "transport")</li> + * <li>database (index name)</li> + * <li>cluster</li> + * <li>username (optional, only available if clientType is "transport")</li> + * <li>password (optional, only available if clientType is "transport")</li> + * <li>ssl (optional, only available if clientType is "transport")</li> + * <li>keystorePath (optional, only available if clientType is "transport")</li> + * <li>keystorePassword (optional, only available if clientType is "transport") + * </li> + * </ul> + */ +public class ElasticSearchDataContextFactory implements DataContextFactory { + + @Override + public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) { + switch (properties.getDataContextType()) { + case "elasticsearch": + case "es-node": + case "elasticsearch-node": + case "es-transport": + case "elasticsearch-transport": + return acceptsInternal(properties); + } + return false; + } + + private boolean acceptsInternal(DataContextProperties properties) { + final String clientType = getClientType(properties); + if (clientType == null) { + return false; + } + if (!"node".equals(clientType)) { + if (properties.getHostname() == null || properties.getPort() == null) { + return false; + } + } + if (getIndex(properties) == null) { + return false; + } + if (getCluster(properties) == null) { + return false; + } + return true; + } + + private String getClientType(DataContextProperties properties) { + switch (properties.getDataContextType()) { + case "elasticsearch-node": + case "es-node": + return "node"; + case "elasticsearch-transport": + case "es-transport": + return "transport"; + } + final String clientType = (String) properties.toMap().get("clientType"); + return clientType; + } + + private String getIndex(DataContextProperties properties) { + final String databaseName = properties.getDatabaseName(); + if (databaseName == null) { + return (String) properties.toMap().get("index"); + } + return databaseName; + } + + private String getCluster(DataContextProperties properties) { + return (String) properties.toMap().get("cluster"); + } + + @Override + public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) + throws UnsupportedDataContextPropertiesException, ConnectionException { + final String clientType = getClientType(properties); + final Client client; + if ("node".equals(clientType)) { + client = createNodeClient(properties); + } else { + client = createTransportClient(properties); + } + final String indexName = getIndex(properties); + final SimpleTableDef[] tableDefinitions = properties.getTableDefs(); + return new ElasticSearchDataContext(client, indexName, tableDefinitions); + } + + private Client createTransportClient(DataContextProperties properties) { + final Builder settingsBuilder = ImmutableSettings.builder(); + settingsBuilder.put("name", "MetaModel"); + settingsBuilder.put("cluster.name", getCluster(properties)); + if (properties.getUsername() != null && properties.getPassword() != null) { + settingsBuilder.put("shield.user", properties.getUsername() + ":" + properties.getPassword()); + if ("true".equals(properties.toMap().get("ssl"))) { + if (properties.toMap().get("keystorePath") != null) { + settingsBuilder.put("shield.ssl.keystore.path", properties.toMap().get("keystorePath")); + settingsBuilder.put("shield.ssl.keystore.password", properties.toMap().get("keystorePassword")); + } + settingsBuilder.put("shield.transport.ssl", "true"); + } + } + final Settings settings = settingsBuilder.build(); + + final TransportClient client = new TransportClient(settings); + client.addTransportAddress(new InetSocketTransportAddress(properties.getHostname(), properties.getPort())); + return client; + } + + private Client createNodeClient(DataContextProperties properties) { + final Builder settingsBuilder = ImmutableSettings.builder(); + settingsBuilder.put("name", "MetaModel"); + settingsBuilder.put("shield.enabled", false); + final Settings settings = settingsBuilder.build(); + final Node node = NodeBuilder.nodeBuilder().clusterName(getCluster(properties)).client(true).settings(settings) + .node(); + return node.client(); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/f35bfed0/elasticsearch/native/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory b/elasticsearch/native/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory new file mode 100644 index 0000000..b33339b --- /dev/null +++ b/elasticsearch/native/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory @@ -0,0 +1 @@ +org.apache.metamodel.elasticsearch.nativeclient.ElasticSearchDataContextFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metamodel/blob/f35bfed0/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java index c452d7b..b55db13 100644 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java @@ -20,6 +20,7 @@ package org.apache.metamodel.elasticsearch.rest; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -83,7 +84,9 @@ import io.searchbox.params.Parameters; * This implementation supports either automatic discovery of a schema or manual * specification of a schema, through the {@link SimpleTableDef} class. */ -public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext { +public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements + DataContext, + UpdateableDataContext { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataContext.class); @@ -95,16 +98,17 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im private final JestClient elasticSearchClient; private final String indexName; - // Table definitions that are set from the beginning, not supposed to be changed. + // Table definitions that are set from the beginning, not supposed to be + // changed. private final List<SimpleTableDef> staticTableDefinitions; // Table definitions that are discovered, these can change private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>(); /** - * Constructs a {@link ElasticSearchRestDataContext}. This constructor accepts a - * custom array of {@link SimpleTableDef}s which allows the user to define - * his own view on the indexes in the engine. + * Constructs a {@link ElasticSearchRestDataContext}. This constructor + * accepts a custom array of {@link SimpleTableDef}s which allows the user + * to define his own view on the indexes in the engine. * * @param client * the ElasticSearch client @@ -123,13 +127,14 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im } this.elasticSearchClient = client; this.indexName = indexName; - this.staticTableDefinitions = Arrays.asList(tableDefinitions); + this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections + .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions)); this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema())); } /** - * Constructs a {@link ElasticSearchRestDataContext} and automatically detects - * the schema structure/view on all indexes (see + * Constructs a {@link ElasticSearchRestDataContext} and automatically + * detects the schema structure/view on all indexes (see * {@link #detectTable(JsonObject, String)}). * * @param client @@ -158,20 +163,20 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im final GetMapping getMapping = new GetMapping.Builder().addIndex(indexName).build(); jestResult = elasticSearchClient.execute(getMapping); } catch (Exception e) { - logger.error("Failed to retrieve mappings" , e); + logger.error("Failed to retrieve mappings", e); throw new MetaModelException("Failed to execute request for index information needed to detect schema", e); } - if(!jestResult.isSucceeded()){ + if (!jestResult.isSucceeded()) { logger.error("Failed to retrieve mappings; {}", jestResult.getErrorMessage()); throw new MetaModelException("Failed to retrieve mappings; " + jestResult.getErrorMessage()); } final List<SimpleTableDef> result = new ArrayList<>(); - final Set<Map.Entry<String, JsonElement>> mappings = - jestResult.getJsonObject().getAsJsonObject(indexName).getAsJsonObject("mappings").entrySet(); - if(mappings.size() == 0){ + final Set<Map.Entry<String, JsonElement>> mappings = jestResult.getJsonObject().getAsJsonObject(indexName) + .getAsJsonObject("mappings").entrySet(); + if (mappings.size() == 0) { logger.warn("No metadata returned for index name '{}' - no tables will be detected."); } else { @@ -179,7 +184,8 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im final String documentType = entry.getKey(); try { - final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties").getAsJsonObject(), documentType); + final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties") + .getAsJsonObject(), documentType); result.add(table); } catch (Exception e) { logger.error("Unexpected error during detectTable for document type '{}'", documentType, e); @@ -199,8 +205,8 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im /** * Performs an analysis of an available index type in an ElasticSearch - * {@link JestClient} client and tries to detect the index structure based on - * the metadata provided by the java client. + * {@link JestClient} client and tries to detect the index structure based + * on the metadata provided by the java client. * * @param metadataProperties * the ElasticSearch mapping @@ -210,8 +216,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im */ private static SimpleTableDef detectTable(JsonObject metadataProperties, String documentType) { final ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser.parse(metadataProperties); - return new SimpleTableDef(documentType, metaData.getColumnNames(), - metaData.getColumnTypes()); + return new SimpleTableDef(documentType, metaData.getColumnNames(), metaData.getColumnTypes()); } @Override @@ -253,10 +258,10 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im } @Override - protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, - List<FilterItem> whereItems, int firstRow, int maxRows) { - final QueryBuilder queryBuilder = ElasticSearchUtils - .createQueryBuilderForSimpleWhere(whereItems, LogicalOperator.AND); + protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, List<FilterItem> whereItems, + int firstRow, int maxRows) { + final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, + LogicalOperator.AND); if (queryBuilder != null) { // where clause can be pushed down to an ElasticSearch query SearchSourceBuilder searchSourceBuilder = createSearchRequest(firstRow, maxRows, queryBuilder); @@ -268,8 +273,9 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im } private SearchResult executeSearch(Table table, SearchSourceBuilder searchSourceBuilder, boolean scroll) { - Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType(table.getName()); - if(scroll){ + Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType( + table.getName()); + if (scroll) { builder.setParameter(Parameters.SCROLL, TIMEOUT_SCROLL); } @@ -277,7 +283,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im SearchResult result; try { result = elasticSearchClient.execute(search); - } catch (Exception e){ + } catch (Exception e) { logger.warn("Could not execute ElasticSearch query", e); throw new MetaModelException("Could not execute ElasticSearch query", e); } @@ -286,7 +292,8 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im @Override protected DataSet materializeMainSchemaTable(Table table, Column[] columns, int maxRows) { - SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), limitMaxRowsIsSet(maxRows)); + SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), limitMaxRowsIsSet( + maxRows)); return new JestElasticSearchDataSet(elasticSearchClient, searchResult, columns); } @@ -341,7 +348,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im CountResult countResult; try { countResult = elasticSearchClient.execute(count); - } catch (Exception e){ + } catch (Exception e) { logger.warn("Could not execute ElasticSearch get query", e); throw new MetaModelException("Could not execute ElasticSearch get query", e); } http://git-wip-us.apache.org/repos/asf/metamodel/blob/f35bfed0/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java new file mode 100644 index 0000000..b2dc4c3 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import org.apache.metamodel.ConnectionException; +import org.apache.metamodel.DataContext; +import org.apache.metamodel.factory.DataContextFactory; +import org.apache.metamodel.factory.DataContextProperties; +import org.apache.metamodel.factory.ResourceFactoryRegistry; +import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException; +import org.apache.metamodel.util.SimpleTableDef; + +import io.searchbox.client.JestClient; +import io.searchbox.client.JestClientFactory; +import io.searchbox.client.config.HttpClientConfig; + +/** + * Factory for ElasticSearch data context of REST type. + * + * The factory will activate when DataContext type is specified as + * "elasticsearch", "es-rest" or "elasticsearch-rest". + * + * This factory is configured with the following properties: + * + * <ul> + * <li>url (http or https based base URL of elasticsearch)</li> + * <li>database (index name)</li> + * <li>username (optional)</li> + * <li>password (optional)</li> + * </ul> + */ +public class ElasticSearchRestDataContextFactory implements DataContextFactory { + + @Override + public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) { + switch (properties.getDataContextType()) { + case "elasticsearch": + // ensure that the url is http or https based to infer that this is + // a REST based connection + final String url = properties.getUrl(); + return url != null && url.startsWith("http") && acceptsInternal(properties); + case "es-rest": + case "elasticsearch-rest": + return acceptsInternal(properties); + } + return false; + } + + private boolean acceptsInternal(DataContextProperties properties) { + if (properties.getUrl() == null) { + return false; + } + if (getIndex(properties) == null) { + return false; + } + return true; + } + + private JestClient createClient(DataContextProperties properties) { + final String serverUri = properties.getUrl(); + final HttpClientConfig.Builder builder = new HttpClientConfig.Builder(serverUri); + if (properties.getUsername() != null) { + builder.defaultCredentials(properties.getUsername(), properties.getPassword()); + } + + final JestClientFactory clientFactory = new JestClientFactory(); + final HttpClientConfig httpClientConfig = new HttpClientConfig(builder); + clientFactory.setHttpClientConfig(httpClientConfig); + final JestClient client = clientFactory.getObject(); + return client; + } + + private String getIndex(DataContextProperties properties) { + final String databaseName = properties.getDatabaseName(); + if (databaseName == null) { + properties.toMap().get("index"); + } + return databaseName; + } + + @Override + public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) + throws UnsupportedDataContextPropertiesException, ConnectionException { + final JestClient client = createClient(properties); + final String indexName = getIndex(properties); + final SimpleTableDef[] tableDefinitions = properties.getTableDefs(); + return new ElasticSearchRestDataContext(client, indexName, tableDefinitions); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/f35bfed0/elasticsearch/rest/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory b/elasticsearch/rest/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory new file mode 100644 index 0000000..a8924c4 --- /dev/null +++ b/elasticsearch/rest/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory @@ -0,0 +1 @@ +org.apache.metamodel.elasticsearch.rest.ElasticSearchRestDataContextFactory \ No newline at end of file