http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java deleted file mode 100644 index 3e71c4d..0000000 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.rest; - -import java.util.Map; - -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.create.AbstractTableCreationBuilder; -import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; -import org.apache.metamodel.schema.MutableSchema; -import org.apache.metamodel.schema.MutableTable; -import org.apache.metamodel.schema.Schema; -import org.apache.metamodel.schema.Table; - -import io.searchbox.indices.mapping.PutMapping; - -final class JestElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder<JestElasticSearchUpdateCallback> { - - public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback updateCallback, Schema schema, - String name) { - super(updateCallback, schema, name); - } - - @Override - public Table execute() throws MetaModelException { - final MutableTable table = getTable(); - final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table); - - final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext(); - final String indexName = dataContext.getIndexName(); - - final PutMapping putMapping = new PutMapping.Builder(indexName, table.getName(), source).build(); - getUpdateCallback().execute(putMapping); - - final MutableSchema schema = (MutableSchema) getSchema(); - schema.addTable(table); - return table; - } - -}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java deleted file mode 100644 index 37e06dc..0000000 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.rest; - -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.metamodel.data.AbstractDataSet; -import org.apache.metamodel.data.DataSet; -import org.apache.metamodel.data.Row; -import org.apache.metamodel.query.SelectItem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; - -import io.searchbox.client.JestClient; -import io.searchbox.client.JestResult; -import io.searchbox.core.SearchScroll; - -/** - * {@link DataSet} implementation for ElasticSearch - */ -final class JestElasticSearchDataSet extends AbstractDataSet { - - private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchDataSet.class); - - private final JestClient _client; - private final AtomicBoolean _closed; - - private JestResult _searchResponse; - private JsonObject _currentHit; - private int _hitIndex = 0; - - public JestElasticSearchDataSet(JestClient client, JestResult searchResponse, List<SelectItem> selectItems) { - super(selectItems); - _client = client; - _searchResponse = searchResponse; - _closed = new AtomicBoolean(false); - } - - - @Override - public void close() { - super.close(); - boolean closeNow = _closed.compareAndSet(true, false); - if (closeNow) { - final String scrollId = _searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id").getAsString(); - JestClientExecutor.execute(_client, new JestDeleteScroll.Builder(scrollId).build(), false); - } - } - - @Override - protected void finalize() throws Throwable { - super.finalize(); - if (!_closed.get()) { - logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this); - close(); - } - } - - @Override - public boolean next() { - final JsonArray hits = _searchResponse.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits"); - if (hits.size() == 0) { - // break condition for the scroll - _currentHit = null; - return false; - } - - if (_hitIndex < hits.size()) { - // pick the next hit within this search response - _currentHit = hits.get(_hitIndex).getAsJsonObject(); - _hitIndex++; - return true; - } - - final JsonPrimitive scrollId = _searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id"); - if (scrollId == null) { - // this search response is not scrollable - then it's the end. - _currentHit = null; - return false; - } - - // try to scroll to the next set of hits - final SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), ElasticSearchRestDataContext.TIMEOUT_SCROLL).build(); - - _searchResponse = JestClientExecutor.execute(_client, scroll); - - // start over (recursively) - _hitIndex = 0; - return next(); - } - - @Override - public Row getRow() { - if (_currentHit == null) { - return null; - } - - final JsonObject source = _currentHit.getAsJsonObject("_source"); - final String documentId = _currentHit.get("_id").getAsString(); - return JestElasticSearchUtils.createRow(source, documentId, getHeader()); - - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java deleted file mode 100644 index cc1c3e7..0000000 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.rest; - -import io.searchbox.core.DeleteByQuery; -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.delete.AbstractRowDeletionBuilder; -import org.apache.metamodel.delete.RowDeletionBuilder; -import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; -import org.apache.metamodel.query.FilterItem; -import org.apache.metamodel.query.LogicalOperator; -import org.apache.metamodel.schema.Table; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.search.builder.SearchSourceBuilder; - -import java.util.List; - -/** - * {@link RowDeletionBuilder} implementation for - * {@link ElasticSearchRestDataContext}. - */ -final class JestElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder { - private final JestElasticSearchUpdateCallback _updateCallback; - - public JestElasticSearchDeleteBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) { - super(table); - _updateCallback = updateCallback; - } - - @Override - public void execute() throws MetaModelException { - final Table table = getTable(); - final String documentType = table.getName(); - - final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext(); - final String indexName = dataContext.getIndexName(); - - final List<FilterItem> whereItems = getWhereItems(); - - // delete by query - note that creteQueryBuilderForSimpleWhere may - // return matchAllQuery() if no where items are present. - final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, - LogicalOperator.AND); - if (queryBuilder == null) { - // TODO: The where items could not be pushed down to a query. We - // could solve this by running a query first, gather all - // document IDs and then delete by IDs. - throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: " - + whereItems); - } - final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(queryBuilder); - - final DeleteByQuery deleteByQuery = - new DeleteByQuery.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType( - documentType).build(); - - _updateCallback.execute(deleteByQuery); - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java deleted file mode 100644 index 8a1ac71..0000000 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.rest; - -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.drop.AbstractTableDropBuilder; -import org.apache.metamodel.drop.TableDropBuilder; -import org.apache.metamodel.schema.MutableSchema; -import org.apache.metamodel.schema.Table; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.searchbox.indices.mapping.DeleteMapping; - -/** - * {@link TableDropBuilder} for dropping tables (document types) in an - * ElasticSearch index. - */ -final class JestElasticSearchDropTableBuilder extends AbstractTableDropBuilder { - - private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchDropTableBuilder.class); - - private final JestElasticSearchUpdateCallback _updateCallback; - - public JestElasticSearchDropTableBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) { - super(table); - _updateCallback = updateCallback; - } - - @Override - public void execute() throws MetaModelException { - - final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext(); - final Table table = getTable(); - final String documentType = table.getName(); - logger.info("Deleting mapping / document type: {}", documentType); - - final DeleteMapping deleteIndex = new DeleteMapping.Builder(dataContext.getIndexName(), documentType).build(); - - _updateCallback.execute(deleteIndex); - - final MutableSchema schema = (MutableSchema) table.getSchema(); - schema.removeTable(table); - - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java deleted file mode 100644 index 746538d..0000000 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.rest; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; -import org.apache.metamodel.insert.AbstractRowInsertionBuilder; -import org.apache.metamodel.schema.Column; -import org.apache.metamodel.schema.Table; - -import io.searchbox.core.Index; -import io.searchbox.params.Parameters; - -final class JestElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<JestElasticSearchUpdateCallback> { - - public JestElasticSearchInsertBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) { - super(updateCallback, table); - } - - @Override - public void execute() throws MetaModelException { - final JestElasticSearchUpdateCallback updateCallback = getUpdateCallback(); - final ElasticSearchRestDataContext dataContext = updateCallback.getDataContext(); - final String indexName = dataContext.getIndexName(); - final String documentType = getTable().getName(); - - final Map<String, Object> source = new HashMap<>(); - final Column[] columns = getColumns(); - final Object[] values = getValues(); - String id = null; - for (int i = 0; i < columns.length; i++) { - if (isSet(columns[i])) { - final String columnName = columns[i].getName(); - - final Object value = values[i]; - if (ElasticSearchRestDataContext.FIELD_ID.equals(columnName)) { - if (value != null) { - id = value.toString(); - } - } else { - final String fieldName = ElasticSearchUtils.getValidatedFieldName(columnName); - source.put(fieldName, value); - } - } - } - - assert !source.isEmpty(); - - final Index index = new Index.Builder(source).index(indexName).type(documentType).id(id).setParameter( - Parameters.OP_TYPE, "create").build(); - - getUpdateCallback().execute(index); - } - -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java deleted file mode 100644 index 074de2e..0000000 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.rest; - -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; - -import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; -import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; -import org.apache.metamodel.schema.ColumnType; - -import java.util.Map.Entry; - -/** - * Parser that transforms the ElasticSearch metadata response (json-like format) - * into an ElasticSearchMetaData object. - */ -final class JestElasticSearchMetaDataParser { - - /** - * Parses the ElasticSearch meta data info into an ElasticSearchMetaData - * object. This method makes much easier to create the ElasticSearch schema. - * - * @param metaDataInfo - * ElasticSearch mapping metadata in Map format - * @return An ElasticSearchMetaData object - */ - public static ElasticSearchMetaData parse(JsonObject metaDataInfo) { - final int columns = metaDataInfo.entrySet().size() + 1; - final String[] fieldNames = new String[columns]; - final ColumnType[] columnTypes = new ColumnType[columns]; - - // add the document ID field (fixed) - fieldNames[0] = ElasticSearchRestDataContext.FIELD_ID; - columnTypes[0] = ColumnType.STRING; - - int i = 1; - for (Entry<String, JsonElement> metaDataField : metaDataInfo.entrySet()) { - JsonElement fieldMetadata = metaDataField.getValue(); - - fieldNames[i] = metaDataField.getKey(); - columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata); - i++; - - } - return new ElasticSearchMetaData(fieldNames, columnTypes); - } - - private static ColumnType getColumnTypeFromMetadataField(JsonElement fieldMetadata) { - final JsonElement typeElement = ((JsonObject) fieldMetadata).get("type"); - if (typeElement != null) { - String metaDataFieldType = typeElement.getAsString(); - - return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType); - } else { - return ColumnType.STRING; - } - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java deleted file mode 100644 index a61280a..0000000 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.rest; - -import java.util.List; - -import org.apache.metamodel.AbstractUpdateCallback; -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.UpdateCallback; -import org.apache.metamodel.create.TableCreationBuilder; -import org.apache.metamodel.delete.RowDeletionBuilder; -import org.apache.metamodel.drop.TableDropBuilder; -import org.apache.metamodel.insert.RowInsertionBuilder; -import org.apache.metamodel.schema.Schema; -import org.apache.metamodel.schema.Table; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.searchbox.action.Action; -import io.searchbox.action.BulkableAction; -import io.searchbox.client.JestResult; -import io.searchbox.core.Bulk; -import io.searchbox.core.Bulk.Builder; -import io.searchbox.core.BulkResult; -import io.searchbox.core.BulkResult.BulkResultItem; -import io.searchbox.indices.Refresh; - -/** - * {@link UpdateCallback} implementation for - * {@link ElasticSearchRestDataContext}. - */ -final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback { - - private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchUpdateCallback.class); - - private static final int BULK_BUFFER_SIZE = 1000; - - private Bulk.Builder bulkBuilder; - private int bulkActionCount = 0; - private final boolean isBatch; - - public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext, boolean isBatch) { - super(dataContext); - this.isBatch = isBatch; - } - - private boolean isBatch() { - return isBatch; - } - - @Override - public ElasticSearchRestDataContext getDataContext() { - return (ElasticSearchRestDataContext) super.getDataContext(); - } - - @Override - public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException, - IllegalStateException { - return new JestElasticSearchCreateTableBuilder(this, schema, name); - } - - @Override - public boolean isDropTableSupported() { - return true; - } - - @Override - public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException, - UnsupportedOperationException { - return new JestElasticSearchDropTableBuilder(this, table); - } - - @Override - public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException, - UnsupportedOperationException { - return new JestElasticSearchInsertBuilder(this, table); - } - - @Override - public boolean isDeleteSupported() { - return true; - } - - @Override - public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException, - UnsupportedOperationException { - return new JestElasticSearchDeleteBuilder(this, table); - } - - public void onExecuteUpdateFinished() { - if (isBatch()) { - flushBulkActions(); - } - - final String indexName = getDataContext().getIndexName(); - final Refresh refresh = new Refresh.Builder().addIndex(indexName).build(); - - JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false); - } - - private void flushBulkActions() { - if (bulkBuilder == null || bulkActionCount == 0) { - // nothing to flush - return; - } - final Bulk bulk = getBulkBuilder().build(); - logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName()); - executeBlocking(bulk); - - bulkActionCount = 0; - bulkBuilder = null; - } - - public void execute(Action<?> action) { - if (isBatch() && action instanceof BulkableAction) { - final Bulk.Builder bulkBuilder = getBulkBuilder(); - bulkBuilder.addAction((BulkableAction<?>) action); - bulkActionCount++; - if (bulkActionCount == BULK_BUFFER_SIZE) { - flushBulkActions(); - } - } else { - executeBlocking(action); - } - } - - private void executeBlocking(Action<?> action) { - final JestResult result = JestClientExecutor.execute(getDataContext().getElasticSearchClient(), action); - if (!result.isSucceeded()) { - if (result instanceof BulkResult) { - final List<BulkResultItem> failedItems = ((BulkResult) result).getFailedItems(); - for (int i = 0; i < failedItems.size(); i++) { - final BulkResultItem failedItem = failedItems.get(i); - logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i+1, failedItems.size(), failedItem.id, failedItem.operation, failedItem.status, failedItem.error); - } - } - throw new MetaModelException(result.getResponseCode() + " - " + result.getErrorMessage()); - } - } - - private Builder getBulkBuilder() { - if (bulkBuilder == null) { - bulkBuilder = new Bulk.Builder(); - bulkBuilder.defaultIndex(getDataContext().getIndexName()); - } - return bulkBuilder; - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java deleted file mode 100644 index 11a79b7..0000000 --- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch.rest; - -import java.util.Date; -import java.util.List; -import java.util.Map; - -import org.apache.metamodel.data.DataSetHeader; -import org.apache.metamodel.data.DefaultRow; -import org.apache.metamodel.data.Row; -import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter; -import org.apache.metamodel.query.SelectItem; -import org.apache.metamodel.schema.Column; -import org.apache.metamodel.schema.ColumnType; -import org.apache.metamodel.util.NumberComparator; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; - -/** - * Shared/common util functions for the ElasticSearch MetaModel module. - */ -final class JestElasticSearchUtils { - public static Row createRow(JsonObject source, String documentId, DataSetHeader header) { - final Object[] values = new Object[header.size()]; - for (int i = 0; i < values.length; i++) { - final SelectItem selectItem = header.getSelectItem(i); - final Column column = selectItem.getColumn(); - - assert column != null; - assert !selectItem.hasFunction(); - - if (column.isPrimaryKey()) { - values[i] = documentId; - } else { - values[i] = getDataFromColumnType(source.get(column.getName()), column.getType()); - } - } - - return new DefaultRow(header, values); - } - - private static Object getDataFromColumnType(JsonElement field, ColumnType type) { - if (field == null || field.isJsonNull()) { - return null; - } - - if (field.isJsonObject()) { - return new Gson().fromJson(field, Map.class); - } - if (field.isJsonArray()) { - return new Gson().fromJson(field, List.class); - } - - if (type.isNumber()) { - // Pretty terrible workaround to avoid LazilyParsedNumber - // (which is happily output, but not recognized by Jest/GSON). - return NumberComparator.toNumber(field.getAsString()); - } else if (type.isTimeBased()) { - final Date valueToDate = ElasticSearchDateConverter.tryToConvert(field.getAsString()); - if (valueToDate == null) { - return field.getAsString(); - } else { - return valueToDate; - } - } else if (type.isBoolean()) { - return field.getAsBoolean(); - } else { - return field.getAsString(); - } - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java new file mode 100644 index 0000000..4705585 --- /dev/null +++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.http.HttpHost; +import org.apache.metamodel.BatchUpdateScript; +import org.apache.metamodel.UpdateCallback; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.factory.DataContextFactory; +import org.apache.metamodel.factory.DataContextPropertiesImpl; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.ColumnType; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RestClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ElasticSearchRestDataContexFactoryIT { + private static final String INDEX_NAME = "myindex"; + + private static ElasticSearchRestClient externalClient; + + private String dockerHostAddress; + + private DataContextFactory factory; + + @Before + public void setUp() throws Exception { + dockerHostAddress = ElasticSearchRestDataContextIT.determineHostName(); + + externalClient = new ElasticSearchRestClient(RestClient.builder(new HttpHost(dockerHostAddress, 9200)).build()); + + final Map<String, Object> source = new LinkedHashMap<>(); + source.put("mytext", "dummy"); + + final IndexRequest indexRequest = new IndexRequest(INDEX_NAME, "text"); + indexRequest.source(source); + + externalClient.index(indexRequest); + + factory = new ElasticSearchRestDataContextFactory(); + } + + @After + public void tearDown() throws IOException { + externalClient.delete(INDEX_NAME); + } + + @Test + public void testAccepts() throws Exception { + final DataContextPropertiesImpl properties = new DataContextPropertiesImpl(); + properties.setDataContextType("elasticsearch"); + properties.put(DataContextPropertiesImpl.PROPERTY_URL, "http://" + dockerHostAddress + ":9200"); + properties.put(DataContextPropertiesImpl.PROPERTY_DATABASE, INDEX_NAME); + + assertTrue(factory.accepts(properties, null)); + } + + @Test + public void testCreateContextAndBulkScript() throws Exception { + final DataContextPropertiesImpl properties = new DataContextPropertiesImpl(); + properties.setDataContextType("es-rest"); + properties.put(DataContextPropertiesImpl.PROPERTY_URL, "http://" + dockerHostAddress + ":9200"); + properties.put(DataContextPropertiesImpl.PROPERTY_DATABASE, INDEX_NAME); + + assertTrue(factory.accepts(properties, null)); + + final ElasticSearchRestDataContext dataContext = (ElasticSearchRestDataContext) factory.create(properties, + null); + + dataContext.executeUpdate(new BatchUpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.createTable(INDEX_NAME, "persons") + .withColumn("name").ofType(ColumnType.STRING) + .withColumn("age").ofType(ColumnType.INTEGER) + .execute(); + } + }); + + dataContext.executeUpdate(new BatchUpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto("persons").value("name", "John Doe").value("age", 42).execute(); + callback.insertInto("persons").value("name", "Jane Doe").value("age", 41).execute(); + } + }); + + dataContext.refreshSchemas(); + + final DataSet persons = dataContext.executeQuery("SELECT name, age FROM persons"); + final List<Row> personData = persons.toRows(); + + assertEquals(2, personData.size()); + + // Sort person data, so we can validate each row's values. + Column ageColumn = dataContext.getSchemaByName(INDEX_NAME).getTableByName("persons").getColumnByName("age"); + personData.sort((row1, row2) -> ((Integer) row1.getValue(ageColumn)).compareTo(((Integer) row2.getValue( + ageColumn)))); + + assertThat(Arrays.asList(personData.get(0).getValues()), containsInAnyOrder("Jane Doe", 41)); + assertThat(Arrays.asList(personData.get(1).getValues()), containsInAnyOrder("John Doe", 42)); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java new file mode 100644 index 0000000..7d5eb12 --- /dev/null +++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java @@ -0,0 +1,535 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import javax.swing.table.TableModel; + +import org.apache.http.HttpHost; +import org.apache.metamodel.MetaModelHelper; +import org.apache.metamodel.UpdateCallback; +import org.apache.metamodel.UpdateScript; +import org.apache.metamodel.UpdateableDataContext; +import org.apache.metamodel.create.CreateTable; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.DataSetTableModel; +import org.apache.metamodel.data.InMemoryDataSet; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.delete.DeleteFrom; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.query.FunctionType; +import org.apache.metamodel.query.Query; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.query.parser.QueryParserException; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.ColumnType; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.apache.metamodel.update.Update; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ElasticSearchRestDataContextIT { + private static final String DEFAULT_DOCKER_HOST_NAME = "localhost"; + + private static final String indexName = "twitter"; + private static final String indexType1 = "tweet1"; + private static final String indexType2 = "tweet2"; + private static final String bulkIndexType = "bulktype"; + private static final String peopleIndexType = "peopletype"; + + private static ElasticSearchRestClient client; + + private static UpdateableDataContext dataContext; + + public static String determineHostName() throws URISyntaxException { + final String dockerHost = System.getenv("DOCKER_HOST"); + + if (dockerHost == null) { + // If no value is returned for the DOCKER_HOST environment variable fall back to a default. + return DEFAULT_DOCKER_HOST_NAME; + } else { + return (new URI(dockerHost)).getHost(); + } + } + + @Before + public void setUp() throws Exception { + final String dockerHostAddress = determineHostName(); + + client = new ElasticSearchRestClient(RestClient.builder(new HttpHost(dockerHostAddress, 9200)).build()); + + indexTweeterDocument(indexType1, 1); + indexTweeterDocument(indexType2, 1); + indexTweeterDocument(indexType2, 2, null); + insertPeopleDocuments(); + indexTweeterDocument(indexType2, 1); + indexBulkDocuments(indexName, bulkIndexType, 10); + + client.refresh(indexName); + + dataContext = new ElasticSearchRestDataContext(client, indexName); + } + + @After + public void tearDown() throws IOException { + client.delete(indexName); + } + + private static void insertPeopleDocuments() throws IOException { + indexOnePeopleDocument("female", 20, 5); + indexOnePeopleDocument("female", 17, 8); + indexOnePeopleDocument("female", 18, 9); + indexOnePeopleDocument("female", 19, 10); + indexOnePeopleDocument("female", 20, 11); + indexOnePeopleDocument("male", 19, 1); + indexOnePeopleDocument("male", 17, 2); + indexOnePeopleDocument("male", 18, 3); + indexOnePeopleDocument("male", 18, 4); + } + + @Test + public void testSimpleQuery() throws Exception { + assertEquals("[bulktype, peopletype, tweet1, tweet2]", + Arrays.toString(dataContext.getDefaultSchema().getTableNames().toArray())); + + Table table = dataContext.getDefaultSchema().getTableByName("tweet1"); + + assertThat(table.getColumnNames(), containsInAnyOrder("_id", "message", "postDate", "user")); + + assertEquals(ColumnType.STRING, table.getColumnByName("user").getType()); + assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType()); + assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType()); + + try (DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute()) { + assertEquals(ElasticSearchRestDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[user1, 1]]", ds.getRow().toString()); + } + } + + @Test + public void testDocumentIdAsPrimaryKey() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName("tweet2"); + Column[] pks = table.getPrimaryKeys().toArray(new Column[0]); + assertEquals(1, pks.length); + assertEquals("_id", pks[0].getName()); + + try (DataSet ds = dataContext.query().from(table).select("user", "_id").orderBy("_id").asc().execute()) { + assertTrue(ds.next()); + assertEquals("Row[values=[user1, tweet_tweet2_1]]", ds.getRow().toString()); + } + } + + @Test + public void testExecutePrimaryKeyLookupQuery() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName("tweet2"); + Column[] pks = table.getPrimaryKeys().toArray(new Column[0]); + + try (DataSet ds = dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute()) { + assertTrue(ds.next()); + Object dateValue = ds.getRow().getValue(1); + assertEquals("Row[values=[tweet_tweet2_1, " + dateValue + ", 1, user1]]", ds.getRow().toString()); + + assertFalse(ds.next()); + + assertEquals(InMemoryDataSet.class, ds.getClass()); + } + } + + @Test + public void testDateIsHandledAsDate() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName("tweet1"); + Column column = table.getColumnByName("postDate"); + ColumnType type = column.getType(); + assertEquals(ColumnType.DATE, type); + + DataSet dataSet = dataContext.query().from(table).select(column).execute(); + while (dataSet.next()) { + Object value = dataSet.getRow().getValue(column); + assertTrue("Got class: " + value.getClass() + ", expected Date (or subclass)", value instanceof Date); + } + } + + @Test + public void testNumberIsHandledAsNumber() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + Column column = table.getColumnByName("age"); + ColumnType type = column.getType(); + assertEquals(ColumnType.BIGINT, type); + + DataSet dataSet = dataContext.query().from(table).select(column).execute(); + while (dataSet.next()) { + Object value = dataSet.getRow().getValue(column); + assertTrue("Got class: " + value.getClass() + ", expected Number (or subclass)", value instanceof Number); + } + } + + @Test + public void testCreateTableAndInsertQuery() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + assertNotNull(table); + assertEquals("[" + ElasticSearchUtils.FIELD_ID + ", foo, bar]", Arrays.toString(table.getColumnNames().toArray())); + + final Column fooColumn = table.getColumnByName("foo"); + final Column idColumn = table.getPrimaryKeys().get(0); + assertEquals("Column[name=_id,columnNumber=0,type=STRING,nullable=null,nativeType=null,columnSize=null]", + idColumn.toString()); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.refreshSchemas(); + + + try (DataSet ds = dataContext.query().from(table).selectAll().orderBy("bar").execute()) { + assertTrue(ds.next()); + assertEquals("hello", ds.getRow().getValue(fooColumn).toString()); + assertNotNull(ds.getRow().getValue(idColumn)); + assertTrue(ds.next()); + assertEquals("world", ds.getRow().getValue(fooColumn).toString()); + assertNotNull(ds.getRow().getValue(idColumn)); + assertFalse(ds.next()); + } + } + + @Test + public void testDeleteAll() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new DeleteFrom(table)); + + Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount() + .toQuery()); + assertEquals("Count is wrong", 0, ((Number) row.getValue(0)).intValue()); + } + + @Test + public void testDeleteByQuery() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new DeleteFrom(table).where("foo").eq("hello").where("bar").eq(42)); + + Row row = MetaModelHelper.executeSingleRowQuery(dataContext, + dataContext.query().from(table).select("foo", "bar").toQuery()); + assertEquals("Row[values=[world, 43]]", row.toString()); + } + + @Test + public void testDeleteUnsupportedQueryType() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + // greater than is not supported + try { + dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40)); + fail("Exception expected"); + } catch (UnsupportedOperationException e) { + assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]", e + .getMessage()); + } + } + + @Test + public void testUpdateRow() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42)); + + DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute(); + assertTrue(dataSet.next()); + assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString()); + assertTrue(dataSet.next()); + assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString()); + assertFalse(dataSet.next()); + dataSet.close(); + } + + @Test + public void testWhereColumnEqualsValues() throws Exception { + try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .isEquals("user4").execute()) { + assertEquals(ElasticSearchRestDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[user4, 4]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereColumnIsNullValues() throws Exception { + try (DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate") + .isNull().execute()) { + assertEquals(ElasticSearchRestDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[2]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereColumnIsNotNullValues() throws Exception { + try (DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate") + .isNotNull().execute()) { + assertEquals(ElasticSearchRestDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[1]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereMultiColumnsEqualValues() throws Exception { + try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .isEquals("user4").and("message").ne(5).execute()) { + assertEquals(ElasticSearchRestDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[user4, 4]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereColumnInValues() throws Exception { + try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .in("user4", "user5").orderBy("message").execute()) { + assertTrue(ds.next()); + + String row1 = ds.getRow().toString(); + assertEquals("Row[values=[user4, 4]]", row1); + assertTrue(ds.next()); + + String row2 = ds.getRow().toString(); + assertEquals("Row[values=[user5, 5]]", row2); + + assertFalse(ds.next()); + } + } + + @Test + public void testGroupByQuery() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + + Query q = new Query(); + q.from(table); + q.groupBy(table.getColumnByName("gender")); + q.select(new SelectItem(table.getColumnByName("gender")), + new SelectItem(FunctionType.MAX, table.getColumnByName("age")), + new SelectItem(FunctionType.MIN, table.getColumnByName("age")), new SelectItem(FunctionType.COUNT, "*", + "total"), new SelectItem(FunctionType.MIN, table.getColumnByName("id")).setAlias("firstId")); + q.orderBy("gender"); + DataSet data = dataContext.executeQuery(q); + assertEquals( + "[peopletype.gender, MAX(peopletype.age), MIN(peopletype.age), COUNT(*) AS total, MIN(peopletype.id) AS firstId]", + Arrays.toString(data.getSelectItems().toArray())); + + assertTrue(data.next()); + assertEquals("Row[values=[female, 20, 17, 5, 5]]", data.getRow().toString()); + assertTrue(data.next()); + assertEquals("Row[values=[male, 19, 17, 4, 1]]", data.getRow().toString()); + assertFalse(data.next()); + } + + @Test + public void testFilterOnNumberColumn() { + Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType); + Query q = dataContext.query().from(table).select("user").where("message").greaterThan(7).toQuery(); + DataSet data = dataContext.executeQuery(q); + String[] expectations = new String[] { "Row[values=[user8]]", "Row[values=[user9]]" }; + + assertTrue(data.next()); + assertTrue(Arrays.asList(expectations).contains(data.getRow().toString())); + assertTrue(data.next()); + assertTrue(Arrays.asList(expectations).contains(data.getRow().toString())); + assertFalse(data.next()); + } + + @Test + public void testMaxRows() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + Query query = new Query().from(table).select(table.getColumns()).setMaxRows(5); + DataSet dataSet = dataContext.executeQuery(query); + + TableModel tableModel = new DataSetTableModel(dataSet); + assertEquals(5, tableModel.getRowCount()); + } + + @Test + public void testCountQuery() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType); + Query q = new Query().selectCount().from(table); + + List<Object[]> data = dataContext.executeQuery(q).toObjectArrays(); + assertEquals(1, data.size()); + Object[] row = data.get(0); + assertEquals(1, row.length); + assertEquals(10, ((Number) row[0]).intValue()); + } + + @Test(expected = IllegalArgumentException.class) + public void testQueryForANonExistingTable() throws Exception { + dataContext.query().from("nonExistingTable").select("user").and("message").execute(); + } + + @Test(expected = QueryParserException.class) + public void testQueryForAnExistingTableAndNonExistingField() throws Exception { + indexTweeterDocument(indexType1, 1); + dataContext.query().from(indexType1).select("nonExistingField").execute(); + } + + private static void indexBulkDocuments(final String indexName, final String indexType, final int numberOfDocuments) + throws IOException { + final BulkRequest bulkRequest = new BulkRequest(); + + for (int i = 0; i < numberOfDocuments; i++) { + final IndexRequest indexRequest = new IndexRequest(indexName, indexType, Integer.toString(i)); + indexRequest.source(buildTweeterJson(i)); + + bulkRequest.add(indexRequest); + } + + client.bulk(bulkRequest); + } + + private static void indexTweeterDocument(final String indexType, final int id, final Date date) throws IOException { + final IndexRequest indexRequest = new IndexRequest(indexName, indexType, "tweet_" + indexType + "_" + id); + indexRequest.source(buildTweeterJson(id, date)); + + client.index(indexRequest); + } + + private static void indexTweeterDocument(String indexType, int id) throws IOException { + final IndexRequest indexRequest = new IndexRequest(indexName, indexType, "tweet_" + indexType + "_" + id); + indexRequest.source(buildTweeterJson(id)); + + client.index(indexRequest); + } + + private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException { + final IndexRequest indexRequest = new IndexRequest(indexName, peopleIndexType); + indexRequest.source(buildPeopleJson(gender, age, id)); + + client.index(indexRequest); + } + + private static Map<String, Object> buildTweeterJson(int elementId) { + return buildTweeterJson(elementId, new Date()); + } + + private static Map<String, Object> buildTweeterJson(int elementId, Date date) { + Map<String, Object> map = new LinkedHashMap<>(); + map.put("user", "user" + elementId); + map.put("postDate", date); + map.put("message", elementId); + return map; + } + + private static XContentBuilder buildPeopleJson(String gender, int age, int elementId) throws IOException { + return jsonBuilder().startObject().field("gender", gender).field("age", age).field("id", elementId).endObject(); + } + +}