http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
deleted file mode 100644
index 3e71c4d..0000000
--- 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.Map;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.create.AbstractTableCreationBuilder;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-
-import io.searchbox.indices.mapping.PutMapping;
-
-final class JestElasticSearchCreateTableBuilder extends 
AbstractTableCreationBuilder<JestElasticSearchUpdateCallback> {
-    
-    public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback 
updateCallback, Schema schema,
-            String name) {
-        super(updateCallback, schema, name);
-    }
-
-    @Override
-    public Table execute() throws MetaModelException {
-        final MutableTable table = getTable();
-        final Map<String, ?> source = 
ElasticSearchUtils.getMappingSource(table);
-
-        final ElasticSearchRestDataContext dataContext = 
getUpdateCallback().getDataContext();
-        final String indexName = dataContext.getIndexName();
-
-        final PutMapping putMapping = new PutMapping.Builder(indexName, 
table.getName(), source).build();
-        getUpdateCallback().execute(putMapping);
-
-        final MutableSchema schema = (MutableSchema) getSchema();
-        schema.addTable(table);
-        return table;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
deleted file mode 100644
index 37e06dc..0000000
--- 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.metamodel.data.AbstractDataSet;
-import org.apache.metamodel.data.DataSet;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.query.SelectItem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import io.searchbox.core.SearchScroll;
-
-/**
- * {@link DataSet} implementation for ElasticSearch
- */
-final class JestElasticSearchDataSet extends AbstractDataSet {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(JestElasticSearchDataSet.class);
-
-    private final JestClient _client;
-    private final AtomicBoolean _closed;
-
-    private JestResult _searchResponse;
-    private JsonObject _currentHit;
-    private int _hitIndex = 0;
-
-    public JestElasticSearchDataSet(JestClient client, JestResult 
searchResponse, List<SelectItem> selectItems) {
-        super(selectItems);
-        _client = client;
-        _searchResponse = searchResponse;
-        _closed = new AtomicBoolean(false);
-    }
-
-
-    @Override
-    public void close() {
-        super.close();
-        boolean closeNow = _closed.compareAndSet(true, false);
-        if (closeNow) {
-            final String scrollId = 
_searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id").getAsString();
-            JestClientExecutor.execute(_client, new 
JestDeleteScroll.Builder(scrollId).build(), false);
-        }
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        super.finalize();
-        if (!_closed.get()) {
-            logger.warn("finalize() invoked, but DataSet is not closed. 
Invoking close() on {}", this);
-            close();
-        }
-    }
-
-    @Override
-    public boolean next() {
-        final JsonArray hits = 
_searchResponse.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
-        if (hits.size() == 0) {
-            // break condition for the scroll
-            _currentHit = null;
-            return false;
-        }
-
-        if (_hitIndex < hits.size()) {
-            // pick the next hit within this search response
-            _currentHit = hits.get(_hitIndex).getAsJsonObject();
-            _hitIndex++;
-            return true;
-        }
-
-        final JsonPrimitive scrollId = 
_searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id");
-        if (scrollId == null) {
-            // this search response is not scrollable - then it's the end.
-            _currentHit = null;
-            return false;
-        }
-
-        // try to scroll to the next set of hits
-        final SearchScroll scroll = new 
SearchScroll.Builder(scrollId.getAsString(), 
ElasticSearchRestDataContext.TIMEOUT_SCROLL).build();
-
-        _searchResponse = JestClientExecutor.execute(_client, scroll);
-
-        // start over (recursively)
-        _hitIndex = 0;
-        return next();
-    }
-
-    @Override
-    public Row getRow() {
-        if (_currentHit == null) {
-            return null;
-        }
-
-        final JsonObject source = _currentHit.getAsJsonObject("_source");
-        final String documentId = _currentHit.get("_id").getAsString();
-        return JestElasticSearchUtils.createRow(source, documentId, 
getHeader());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
deleted file mode 100644
index cc1c3e7..0000000
--- 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.core.DeleteByQuery;
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.delete.AbstractRowDeletionBuilder;
-import org.apache.metamodel.delete.RowDeletionBuilder;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.query.FilterItem;
-import org.apache.metamodel.query.LogicalOperator;
-import org.apache.metamodel.schema.Table;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-
-import java.util.List;
-
-/**
- * {@link RowDeletionBuilder} implementation for
- * {@link ElasticSearchRestDataContext}.
- */
-final class JestElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
-    private final JestElasticSearchUpdateCallback _updateCallback;
-
-    public JestElasticSearchDeleteBuilder(JestElasticSearchUpdateCallback 
updateCallback, Table table) {
-        super(table);
-        _updateCallback = updateCallback;
-    }
-
-    @Override
-    public void execute() throws MetaModelException {
-        final Table table = getTable();
-        final String documentType = table.getName();
-
-        final ElasticSearchRestDataContext dataContext = 
_updateCallback.getDataContext();
-        final String indexName = dataContext.getIndexName();
-
-        final List<FilterItem> whereItems = getWhereItems();
-
-        // delete by query - note that creteQueryBuilderForSimpleWhere may
-        // return matchAllQuery() if no where items are present.
-        final QueryBuilder queryBuilder = 
ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
-                LogicalOperator.AND);
-        if (queryBuilder == null) {
-            // TODO: The where items could not be pushed down to a query. We
-            // could solve this by running a query first, gather all
-            // document IDs and then delete by IDs.
-            throw new UnsupportedOperationException("Could not push down WHERE 
items to delete by query request: "
-                    + whereItems);
-        }
-        final SearchSourceBuilder searchSourceBuilder = new 
SearchSourceBuilder();
-        searchSourceBuilder.query(queryBuilder);
-
-        final DeleteByQuery deleteByQuery =
-                new 
DeleteByQuery.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType(
-                        documentType).build();
-
-        _updateCallback.execute(deleteByQuery);
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
deleted file mode 100644
index 8a1ac71..0000000
--- 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.drop.AbstractTableDropBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.searchbox.indices.mapping.DeleteMapping;
-
-/**
- * {@link TableDropBuilder} for dropping tables (document types) in an
- * ElasticSearch index.
- */
-final class JestElasticSearchDropTableBuilder extends AbstractTableDropBuilder 
{
-
-    private static final Logger logger = 
LoggerFactory.getLogger(JestElasticSearchDropTableBuilder.class);
-
-    private final JestElasticSearchUpdateCallback _updateCallback;
-
-    public JestElasticSearchDropTableBuilder(JestElasticSearchUpdateCallback 
updateCallback, Table table) {
-        super(table);
-        _updateCallback = updateCallback;
-    }
-
-    @Override
-    public void execute() throws MetaModelException {
-
-        final ElasticSearchRestDataContext dataContext = 
_updateCallback.getDataContext();
-        final Table table = getTable();
-        final String documentType = table.getName();
-        logger.info("Deleting mapping / document type: {}", documentType);
-
-        final DeleteMapping deleteIndex = new 
DeleteMapping.Builder(dataContext.getIndexName(), documentType).build();
-
-        _updateCallback.execute(deleteIndex);
-
-        final MutableSchema schema = (MutableSchema) table.getSchema();
-        schema.removeTable(table);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
deleted file mode 100644
index 746538d..0000000
--- 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.Table;
-
-import io.searchbox.core.Index;
-import io.searchbox.params.Parameters;
-
-final class JestElasticSearchInsertBuilder extends 
AbstractRowInsertionBuilder<JestElasticSearchUpdateCallback> {
-
-    public JestElasticSearchInsertBuilder(JestElasticSearchUpdateCallback 
updateCallback, Table table) {
-        super(updateCallback, table);
-    }
-
-    @Override
-    public void execute() throws MetaModelException {
-        final JestElasticSearchUpdateCallback updateCallback = 
getUpdateCallback();
-        final ElasticSearchRestDataContext dataContext = 
updateCallback.getDataContext();
-        final String indexName = dataContext.getIndexName();
-        final String documentType = getTable().getName();
-
-        final Map<String, Object> source = new HashMap<>();
-        final Column[] columns = getColumns();
-        final Object[] values = getValues();
-        String id = null;
-        for (int i = 0; i < columns.length; i++) {
-            if (isSet(columns[i])) {
-                final String columnName = columns[i].getName();
-
-                final Object value = values[i];
-                if (ElasticSearchRestDataContext.FIELD_ID.equals(columnName)) {
-                    if (value != null) {
-                        id = value.toString();
-                    }
-                } else {
-                    final String fieldName = 
ElasticSearchUtils.getValidatedFieldName(columnName);
-                    source.put(fieldName, value);
-                }
-            }
-        }
-
-        assert !source.isEmpty();
-
-        final Index index = new 
Index.Builder(source).index(indexName).type(documentType).id(id).setParameter(
-                Parameters.OP_TYPE, "create").build();
-
-        getUpdateCallback().execute(index);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
deleted file mode 100644
index 074de2e..0000000
--- 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.ColumnType;
-
-import java.util.Map.Entry;
-
-/**
- * Parser that transforms the ElasticSearch metadata response (json-like 
format)
- * into an ElasticSearchMetaData object.
- */
-final class JestElasticSearchMetaDataParser {
-
-    /**
-     * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
-     * object. This method makes much easier to create the ElasticSearch 
schema.
-     *
-     * @param metaDataInfo
-     *            ElasticSearch mapping metadata in Map format
-     * @return An ElasticSearchMetaData object
-     */
-    public static ElasticSearchMetaData parse(JsonObject metaDataInfo) {
-        final int columns = metaDataInfo.entrySet().size() + 1;
-        final String[] fieldNames = new String[columns];
-        final ColumnType[] columnTypes = new ColumnType[columns];
-
-        // add the document ID field (fixed)
-        fieldNames[0] = ElasticSearchRestDataContext.FIELD_ID;
-        columnTypes[0] = ColumnType.STRING;
-
-        int i = 1;
-        for (Entry<String, JsonElement> metaDataField : 
metaDataInfo.entrySet()) {
-            JsonElement fieldMetadata = metaDataField.getValue();
-
-            fieldNames[i] = metaDataField.getKey();
-            columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
-            i++;
-
-        }
-        return new ElasticSearchMetaData(fieldNames, columnTypes);
-    }
-
-    private static ColumnType getColumnTypeFromMetadataField(JsonElement 
fieldMetadata) {
-        final JsonElement typeElement = ((JsonObject) 
fieldMetadata).get("type");
-        if (typeElement != null) {
-            String metaDataFieldType = typeElement.getAsString();
-
-            return 
ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
-        } else {
-            return ColumnType.STRING;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
deleted file mode 100644
index a61280a..0000000
--- 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.List;
-
-import org.apache.metamodel.AbstractUpdateCallback;
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.UpdateCallback;
-import org.apache.metamodel.create.TableCreationBuilder;
-import org.apache.metamodel.delete.RowDeletionBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.insert.RowInsertionBuilder;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.searchbox.action.Action;
-import io.searchbox.action.BulkableAction;
-import io.searchbox.client.JestResult;
-import io.searchbox.core.Bulk;
-import io.searchbox.core.Bulk.Builder;
-import io.searchbox.core.BulkResult;
-import io.searchbox.core.BulkResult.BulkResultItem;
-import io.searchbox.indices.Refresh;
-
-/**
- * {@link UpdateCallback} implementation for
- * {@link ElasticSearchRestDataContext}.
- */
-final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(JestElasticSearchUpdateCallback.class);
-
-    private static final int BULK_BUFFER_SIZE = 1000;
-
-    private Bulk.Builder bulkBuilder;
-    private int bulkActionCount = 0;
-    private final boolean isBatch;
-
-    public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext 
dataContext, boolean isBatch) {
-        super(dataContext);
-        this.isBatch = isBatch;
-    }
-
-    private boolean isBatch() {
-        return isBatch;
-    }
-
-    @Override
-    public ElasticSearchRestDataContext getDataContext() {
-        return (ElasticSearchRestDataContext) super.getDataContext();
-    }
-
-    @Override
-    public TableCreationBuilder createTable(Schema schema, String name) throws 
IllegalArgumentException,
-            IllegalStateException {
-        return new JestElasticSearchCreateTableBuilder(this, schema, name);
-    }
-
-    @Override
-    public boolean isDropTableSupported() {
-        return true;
-    }
-
-    @Override
-    public TableDropBuilder dropTable(Table table) throws 
IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchDropTableBuilder(this, table);
-    }
-
-    @Override
-    public RowInsertionBuilder insertInto(Table table) throws 
IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchInsertBuilder(this, table);
-    }
-
-    @Override
-    public boolean isDeleteSupported() {
-        return true;
-    }
-
-    @Override
-    public RowDeletionBuilder deleteFrom(Table table) throws 
IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchDeleteBuilder(this, table);
-    }
-
-    public void onExecuteUpdateFinished() {
-        if (isBatch()) {
-            flushBulkActions();
-        }
-
-        final String indexName = getDataContext().getIndexName();
-        final Refresh refresh = new 
Refresh.Builder().addIndex(indexName).build();
-
-        JestClientExecutor.execute(getDataContext().getElasticSearchClient(), 
refresh, false);
-    }
-
-    private void flushBulkActions() {
-        if (bulkBuilder == null || bulkActionCount == 0) {
-            // nothing to flush
-            return;
-        }
-        final Bulk bulk = getBulkBuilder().build();
-        logger.info("Flushing {} actions to ElasticSearch index {}", 
bulkActionCount, getDataContext().getIndexName());
-        executeBlocking(bulk);
-
-        bulkActionCount = 0;
-        bulkBuilder = null;
-    }
-
-    public void execute(Action<?> action) {
-        if (isBatch() && action instanceof BulkableAction) {
-            final Bulk.Builder bulkBuilder = getBulkBuilder();
-            bulkBuilder.addAction((BulkableAction<?>) action);
-            bulkActionCount++;
-            if (bulkActionCount == BULK_BUFFER_SIZE) {
-                flushBulkActions();
-            }
-        } else {
-            executeBlocking(action);
-        }
-    }
-
-    private void executeBlocking(Action<?> action) {
-        final JestResult result = 
JestClientExecutor.execute(getDataContext().getElasticSearchClient(), action);
-        if (!result.isSucceeded()) {
-            if (result instanceof BulkResult) {
-                final List<BulkResultItem> failedItems = ((BulkResult) 
result).getFailedItems();
-                for (int i = 0; i < failedItems.size(); i++) {
-                    final BulkResultItem failedItem = failedItems.get(i);
-                    logger.error("Bulk failed with item no. {} of {}: id={} 
op={} status={} error={}", i+1, failedItems.size(), failedItem.id, 
failedItem.operation, failedItem.status, failedItem.error);
-                }
-            }
-            throw new MetaModelException(result.getResponseCode() + " - " + 
result.getErrorMessage());
-        }
-    }
-
-    private Builder getBulkBuilder() {
-        if (bulkBuilder == null) {
-            bulkBuilder = new Bulk.Builder();
-            bulkBuilder.defaultIndex(getDataContext().getIndexName());
-        }
-        return bulkBuilder;
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
deleted file mode 100644
index 11a79b7..0000000
--- 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.DefaultRow;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-import org.apache.metamodel.util.NumberComparator;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-/**
- * Shared/common util functions for the ElasticSearch MetaModel module.
- */
-final class JestElasticSearchUtils {
-    public static Row createRow(JsonObject source, String documentId, 
DataSetHeader header) {
-        final Object[] values = new Object[header.size()];
-        for (int i = 0; i < values.length; i++) {
-            final SelectItem selectItem = header.getSelectItem(i);
-            final Column column = selectItem.getColumn();
-
-            assert column != null;
-            assert !selectItem.hasFunction();
-
-            if (column.isPrimaryKey()) {
-                values[i] = documentId;
-            } else {
-                values[i] = 
getDataFromColumnType(source.get(column.getName()), column.getType());
-            }
-        }
-
-        return new DefaultRow(header, values);
-    }
-
-    private static Object getDataFromColumnType(JsonElement field, ColumnType 
type) {
-        if (field == null || field.isJsonNull()) {
-            return null;
-        }
-
-        if (field.isJsonObject()) {
-            return new Gson().fromJson(field, Map.class);
-        }
-        if (field.isJsonArray()) {
-            return new Gson().fromJson(field, List.class);
-        }
-
-        if (type.isNumber()) {
-            // Pretty terrible workaround to avoid LazilyParsedNumber
-            // (which is happily output, but not recognized by Jest/GSON).
-            return NumberComparator.toNumber(field.getAsString());
-        } else if (type.isTimeBased()) {
-            final Date valueToDate = 
ElasticSearchDateConverter.tryToConvert(field.getAsString());
-            if (valueToDate == null) {
-                return field.getAsString();
-            } else {
-                return valueToDate;
-            }
-        } else if (type.isBoolean()) {
-            return field.getAsBoolean();
-        } else {
-            return field.getAsString();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java
 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java
new file mode 100644
index 0000000..4705585
--- /dev/null
+++ 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.HttpHost;
+import org.apache.metamodel.BatchUpdateScript;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.factory.DataContextFactory;
+import org.apache.metamodel.factory.DataContextPropertiesImpl;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RestClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticSearchRestDataContexFactoryIT {
+    private static final String INDEX_NAME = "myindex";
+
+    private static ElasticSearchRestClient externalClient;
+
+    private String dockerHostAddress;
+
+    private DataContextFactory factory;
+
+    @Before
+    public void setUp() throws Exception {
+        dockerHostAddress = ElasticSearchRestDataContextIT.determineHostName();
+
+        externalClient = new ElasticSearchRestClient(RestClient.builder(new 
HttpHost(dockerHostAddress, 9200)).build());
+
+        final Map<String, Object> source = new LinkedHashMap<>();
+        source.put("mytext", "dummy");
+
+        final IndexRequest indexRequest = new IndexRequest(INDEX_NAME, "text");
+        indexRequest.source(source);
+
+        externalClient.index(indexRequest);
+
+        factory = new ElasticSearchRestDataContextFactory();
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        externalClient.delete(INDEX_NAME);
+    }
+
+    @Test
+    public void testAccepts() throws Exception {
+        final DataContextPropertiesImpl properties = new 
DataContextPropertiesImpl();
+        properties.setDataContextType("elasticsearch");
+        properties.put(DataContextPropertiesImpl.PROPERTY_URL, "http://"; + 
dockerHostAddress + ":9200");
+        properties.put(DataContextPropertiesImpl.PROPERTY_DATABASE, 
INDEX_NAME);
+
+        assertTrue(factory.accepts(properties, null));
+    }
+
+    @Test
+    public void testCreateContextAndBulkScript() throws Exception {
+        final DataContextPropertiesImpl properties = new 
DataContextPropertiesImpl();
+        properties.setDataContextType("es-rest");
+        properties.put(DataContextPropertiesImpl.PROPERTY_URL, "http://"; + 
dockerHostAddress + ":9200");
+        properties.put(DataContextPropertiesImpl.PROPERTY_DATABASE, 
INDEX_NAME);
+
+        assertTrue(factory.accepts(properties, null));
+
+        final ElasticSearchRestDataContext dataContext = 
(ElasticSearchRestDataContext) factory.create(properties,
+                null);
+
+        dataContext.executeUpdate(new BatchUpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.createTable(INDEX_NAME, "persons")
+                        .withColumn("name").ofType(ColumnType.STRING)
+                        .withColumn("age").ofType(ColumnType.INTEGER)
+                        .execute();
+            }
+        });
+
+        dataContext.executeUpdate(new BatchUpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto("persons").value("name", "John 
Doe").value("age", 42).execute();
+                callback.insertInto("persons").value("name", "Jane 
Doe").value("age", 41).execute();
+            }
+        });
+
+        dataContext.refreshSchemas();
+
+        final DataSet persons = dataContext.executeQuery("SELECT name, age 
FROM persons");
+        final List<Row> personData = persons.toRows();
+
+        assertEquals(2, personData.size());
+
+        // Sort person data, so we can validate each row's values.
+        Column ageColumn = 
dataContext.getSchemaByName(INDEX_NAME).getTableByName("persons").getColumnByName("age");
+        personData.sort((row1, row2) -> ((Integer) 
row1.getValue(ageColumn)).compareTo(((Integer) row2.getValue(
+                ageColumn))));
+
+        assertThat(Arrays.asList(personData.get(0).getValues()), 
containsInAnyOrder("Jane Doe", 41));
+        assertThat(Arrays.asList(personData.get(1).getValues()), 
containsInAnyOrder("John Doe", 42));
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java
 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java
new file mode 100644
index 0000000..7d5eb12
--- /dev/null
+++ 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java
@@ -0,0 +1,535 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.swing.table.TableModel;
+
+import org.apache.http.HttpHost;
+import org.apache.metamodel.MetaModelHelper;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.UpdateScript;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.create.CreateTable;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.DataSetTableModel;
+import org.apache.metamodel.data.InMemoryDataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.delete.DeleteFrom;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FunctionType;
+import org.apache.metamodel.query.Query;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.query.parser.QueryParserException;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.Update;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticSearchRestDataContextIT {
+    private static final String DEFAULT_DOCKER_HOST_NAME = "localhost";
+
+    private static final String indexName = "twitter";
+    private static final String indexType1 = "tweet1";
+    private static final String indexType2 = "tweet2";
+    private static final String bulkIndexType = "bulktype";
+    private static final String peopleIndexType = "peopletype";
+
+    private static ElasticSearchRestClient client;
+
+    private static UpdateableDataContext dataContext;
+    
+    public static String determineHostName() throws URISyntaxException {
+        final String dockerHost = System.getenv("DOCKER_HOST");
+
+        if (dockerHost == null) {
+            // If no value is returned for the DOCKER_HOST environment 
variable fall back to a default.
+            return DEFAULT_DOCKER_HOST_NAME;
+        } else {
+            return (new URI(dockerHost)).getHost();
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        final String dockerHostAddress = determineHostName();
+        
+        client = new ElasticSearchRestClient(RestClient.builder(new 
HttpHost(dockerHostAddress, 9200)).build()); 
+
+        indexTweeterDocument(indexType1, 1);
+        indexTweeterDocument(indexType2, 1);
+        indexTweeterDocument(indexType2, 2, null);
+        insertPeopleDocuments();
+        indexTweeterDocument(indexType2, 1);
+        indexBulkDocuments(indexName, bulkIndexType, 10);
+        
+        client.refresh(indexName);
+
+        dataContext = new ElasticSearchRestDataContext(client, indexName);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        client.delete(indexName);
+    }
+
+    private static void insertPeopleDocuments() throws IOException {
+        indexOnePeopleDocument("female", 20, 5);
+        indexOnePeopleDocument("female", 17, 8);
+        indexOnePeopleDocument("female", 18, 9);
+        indexOnePeopleDocument("female", 19, 10);
+        indexOnePeopleDocument("female", 20, 11);
+        indexOnePeopleDocument("male", 19, 1);
+        indexOnePeopleDocument("male", 17, 2);
+        indexOnePeopleDocument("male", 18, 3);
+        indexOnePeopleDocument("male", 18, 4);
+    }
+
+    @Test
+    public void testSimpleQuery() throws Exception {
+        assertEquals("[bulktype, peopletype, tweet1, tweet2]",
+                
Arrays.toString(dataContext.getDefaultSchema().getTableNames().toArray()));
+
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+
+        assertThat(table.getColumnNames(), containsInAnyOrder("_id", 
"message", "postDate", "user"));
+
+        assertEquals(ColumnType.STRING, 
table.getColumnByName("user").getType());
+        assertEquals(ColumnType.DATE, 
table.getColumnByName("postDate").getType());
+        assertEquals(ColumnType.BIGINT, 
table.getColumnByName("message").getType());
+
+        try (DataSet ds = 
dataContext.query().from(indexType1).select("user").and("message").execute()) {
+            assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user1, 1]]", ds.getRow().toString());
+        }
+    }
+
+    @Test
+    public void testDocumentIdAsPrimaryKey() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet2");
+        Column[] pks = table.getPrimaryKeys().toArray(new Column[0]);
+        assertEquals(1, pks.length);
+        assertEquals("_id", pks[0].getName());
+
+        try (DataSet ds = dataContext.query().from(table).select("user", 
"_id").orderBy("_id").asc().execute()) {
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user1, tweet_tweet2_1]]", 
ds.getRow().toString());
+        }
+    }
+
+    @Test
+    public void testExecutePrimaryKeyLookupQuery() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet2");
+        Column[] pks = table.getPrimaryKeys().toArray(new Column[0]);
+
+        try (DataSet ds = 
dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute())
 {
+            assertTrue(ds.next());
+            Object dateValue = ds.getRow().getValue(1);
+            assertEquals("Row[values=[tweet_tweet2_1, " + dateValue + ", 1, 
user1]]", ds.getRow().toString());
+
+            assertFalse(ds.next());
+
+            assertEquals(InMemoryDataSet.class, ds.getClass());
+        }
+    }
+
+    @Test
+    public void testDateIsHandledAsDate() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+        Column column = table.getColumnByName("postDate");
+        ColumnType type = column.getType();
+        assertEquals(ColumnType.DATE, type);
+
+        DataSet dataSet = 
dataContext.query().from(table).select(column).execute();
+        while (dataSet.next()) {
+            Object value = dataSet.getRow().getValue(column);
+            assertTrue("Got class: " + value.getClass() + ", expected Date (or 
subclass)", value instanceof Date);
+        }
+    }
+
+    @Test
+    public void testNumberIsHandledAsNumber() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+        Column column = table.getColumnByName("age");
+        ColumnType type = column.getType();
+        assertEquals(ColumnType.BIGINT, type);
+
+        DataSet dataSet = 
dataContext.query().from(table).select(column).execute();
+        while (dataSet.next()) {
+            Object value = dataSet.getRow().getValue(column);
+            assertTrue("Got class: " + value.getClass() + ", expected Number 
(or subclass)", value instanceof Number);
+        }
+    }
+
+    @Test
+    public void testCreateTableAndInsertQuery() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+        assertNotNull(table);
+        assertEquals("[" + ElasticSearchUtils.FIELD_ID + ", foo, bar]", 
Arrays.toString(table.getColumnNames().toArray()));
+
+        final Column fooColumn = table.getColumnByName("foo");
+        final Column idColumn = table.getPrimaryKeys().get(0);
+        
assertEquals("Column[name=_id,columnNumber=0,type=STRING,nullable=null,nativeType=null,columnSize=null]",
+                idColumn.toString());
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        dataContext.refreshSchemas();
+
+
+        try (DataSet ds = 
dataContext.query().from(table).selectAll().orderBy("bar").execute()) {
+            assertTrue(ds.next());
+            assertEquals("hello", ds.getRow().getValue(fooColumn).toString());
+            assertNotNull(ds.getRow().getValue(idColumn));
+            assertTrue(ds.next());
+            assertEquals("world", ds.getRow().getValue(fooColumn).toString());
+            assertNotNull(ds.getRow().getValue(idColumn));
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testDeleteAll() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        dataContext.executeUpdate(new DeleteFrom(table));
+
+        Row row = MetaModelHelper.executeSingleRowQuery(dataContext, 
dataContext.query().from(table).selectCount()
+                .toQuery());
+        assertEquals("Count is wrong", 0, ((Number) 
row.getValue(0)).intValue());
+    }
+
+    @Test
+    public void testDeleteByQuery() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        dataContext.executeUpdate(new 
DeleteFrom(table).where("foo").eq("hello").where("bar").eq(42));
+
+        Row row = MetaModelHelper.executeSingleRowQuery(dataContext,
+                dataContext.query().from(table).select("foo", 
"bar").toQuery());
+        assertEquals("Row[values=[world, 43]]", row.toString());
+    }
+
+    @Test
+    public void testDeleteUnsupportedQueryType() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        // greater than is not supported
+        try {
+            dataContext.executeUpdate(new 
DeleteFrom(table).where("bar").gt(40));
+            fail("Exception expected");
+        } catch (UnsupportedOperationException e) {
+            assertEquals("Could not push down WHERE items to delete by query 
request: [testCreateTable.bar > 40]", e
+                    .getMessage());
+        }
+    }
+
+    @Test
+    public void testUpdateRow() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        dataContext.executeUpdate(new Update(table).value("foo", 
"howdy").where("bar").eq(42));
+        
+        DataSet dataSet = dataContext.query().from(table).select("foo", 
"bar").orderBy("bar").execute();
+        assertTrue(dataSet.next());
+        assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
+        assertTrue(dataSet.next());
+        assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
+        assertFalse(dataSet.next());
+        dataSet.close();
+    }
+
+    @Test
+    public void testWhereColumnEqualsValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .isEquals("user4").execute()) {
+            assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnIsNullValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(indexType2).select("message").where("postDate")
+                .isNull().execute()) {
+            assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[2]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnIsNotNullValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(indexType2).select("message").where("postDate")
+                .isNotNull().execute()) {
+            assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[1]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereMultiColumnsEqualValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .isEquals("user4").and("message").ne(5).execute()) {
+            assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnInValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .in("user4", "user5").orderBy("message").execute()) {
+            assertTrue(ds.next());
+
+            String row1 = ds.getRow().toString();
+            assertEquals("Row[values=[user4, 4]]", row1);
+            assertTrue(ds.next());
+
+            String row2 = ds.getRow().toString();
+            assertEquals("Row[values=[user5, 5]]", row2);
+
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testGroupByQuery() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+
+        Query q = new Query();
+        q.from(table);
+        q.groupBy(table.getColumnByName("gender"));
+        q.select(new SelectItem(table.getColumnByName("gender")),
+                new SelectItem(FunctionType.MAX, table.getColumnByName("age")),
+                new SelectItem(FunctionType.MIN, 
table.getColumnByName("age")), new SelectItem(FunctionType.COUNT, "*",
+                        "total"), new SelectItem(FunctionType.MIN, 
table.getColumnByName("id")).setAlias("firstId"));
+        q.orderBy("gender");
+        DataSet data = dataContext.executeQuery(q);
+        assertEquals(
+                "[peopletype.gender, MAX(peopletype.age), MIN(peopletype.age), 
COUNT(*) AS total, MIN(peopletype.id) AS firstId]",
+                Arrays.toString(data.getSelectItems().toArray()));
+
+        assertTrue(data.next());
+        assertEquals("Row[values=[female, 20, 17, 5, 5]]", 
data.getRow().toString());
+        assertTrue(data.next());
+        assertEquals("Row[values=[male, 19, 17, 4, 1]]", 
data.getRow().toString());
+        assertFalse(data.next());
+    }
+
+    @Test
+    public void testFilterOnNumberColumn() {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(bulkIndexType);
+        Query q = 
dataContext.query().from(table).select("user").where("message").greaterThan(7).toQuery();
+        DataSet data = dataContext.executeQuery(q);
+        String[] expectations = new String[] { "Row[values=[user8]]", 
"Row[values=[user9]]" };
+
+        assertTrue(data.next());
+        
assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+        assertTrue(data.next());
+        
assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+        assertFalse(data.next());
+    }
+
+    @Test
+    public void testMaxRows() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+        Query query = new 
Query().from(table).select(table.getColumns()).setMaxRows(5);
+        DataSet dataSet = dataContext.executeQuery(query);
+
+        TableModel tableModel = new DataSetTableModel(dataSet);
+        assertEquals(5, tableModel.getRowCount());
+    }
+
+    @Test
+    public void testCountQuery() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(bulkIndexType);
+        Query q = new Query().selectCount().from(table);
+
+        List<Object[]> data = dataContext.executeQuery(q).toObjectArrays();
+        assertEquals(1, data.size());
+        Object[] row = data.get(0);
+        assertEquals(1, row.length);
+        assertEquals(10, ((Number) row[0]).intValue());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testQueryForANonExistingTable() throws Exception {
+        
dataContext.query().from("nonExistingTable").select("user").and("message").execute();
+    }
+
+    @Test(expected = QueryParserException.class)
+    public void testQueryForAnExistingTableAndNonExistingField() throws 
Exception {
+        indexTweeterDocument(indexType1, 1);
+        
dataContext.query().from(indexType1).select("nonExistingField").execute();
+    }
+
+    private static void indexBulkDocuments(final String indexName, final 
String indexType, final int numberOfDocuments)
+            throws IOException {
+        final BulkRequest bulkRequest = new BulkRequest();
+
+        for (int i = 0; i < numberOfDocuments; i++) {
+            final IndexRequest indexRequest = new IndexRequest(indexName, 
indexType, Integer.toString(i));
+            indexRequest.source(buildTweeterJson(i));
+            
+            bulkRequest.add(indexRequest);
+        }
+        
+        client.bulk(bulkRequest);
+    }
+
+    private static void indexTweeterDocument(final String indexType, final int 
id, final Date date) throws IOException {
+        final IndexRequest indexRequest = new IndexRequest(indexName, 
indexType, "tweet_" + indexType + "_" + id);
+        indexRequest.source(buildTweeterJson(id, date));
+        
+        client.index(indexRequest);
+    }
+
+    private static void indexTweeterDocument(String indexType, int id) throws 
IOException {
+        final IndexRequest indexRequest = new IndexRequest(indexName, 
indexType, "tweet_" + indexType + "_" + id);
+        indexRequest.source(buildTweeterJson(id));
+        
+        client.index(indexRequest);
+    }
+
+    private static void indexOnePeopleDocument(String gender, int age, int id) 
throws IOException {
+        final IndexRequest indexRequest = new IndexRequest(indexName, 
peopleIndexType);
+        indexRequest.source(buildPeopleJson(gender, age, id));
+        
+        client.index(indexRequest);
+    }
+
+    private static Map<String, Object> buildTweeterJson(int elementId) {
+        return buildTweeterJson(elementId, new Date());
+    }
+
+    private static Map<String, Object> buildTweeterJson(int elementId, Date 
date) {
+        Map<String, Object> map = new LinkedHashMap<>();
+        map.put("user", "user" + elementId);
+        map.put("postDate", date);
+        map.put("message", elementId);
+        return map;
+    }
+
+    private static XContentBuilder buildPeopleJson(String gender, int age, int 
elementId) throws IOException {
+        return jsonBuilder().startObject().field("gender", 
gender).field("age", age).field("id", elementId).endObject();
+    }
+
+}

Reply via email to