http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
 
b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
new file mode 100644
index 0000000..0ae170c
--- /dev/null
+++ 
b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
@@ -0,0 +1,606 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.swing.table.TableModel;
+
+import org.apache.metamodel.MetaModelHelper;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.UpdateScript;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.create.CreateTable;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.DataSetTableModel;
+import org.apache.metamodel.data.InMemoryDataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.delete.DeleteFrom;
+import org.apache.metamodel.drop.DropTable;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import 
org.apache.metamodel.elasticsearch.nativeclient.utils.EmbeddedElasticsearchServer;
+import org.apache.metamodel.query.FunctionType;
+import org.apache.metamodel.query.Query;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.Update;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import 
org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import 
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.*;
+
+public class ElasticSearchDataContextTest {
+
+    private static final String indexName = "twitter";
+    private static final String indexType1 = "tweet1";
+    private static final String indexType2 = "tweet2";
+    private static final String indexName2 = "twitter2";
+    private static final String indexType3 = "tweet3";
+    private static final String bulkIndexType = "bulktype";
+    private static final String peopleIndexType = "peopletype";
+    private static final String mapping = 
"{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}";
+    private static EmbeddedElasticsearchServer embeddedElasticsearchServer;
+    private static Client client;
+    private static UpdateableDataContext dataContext;
+
+    @BeforeClass
+    public static void beforeTests() throws Exception {
+        embeddedElasticsearchServer = new EmbeddedElasticsearchServer();
+        client = embeddedElasticsearchServer.getClient();
+        indexTweeterDocument(indexType1, 1);
+        indexTweeterDocument(indexType2, 1);
+        indexTweeterDocument(indexType2, 2, null);
+        insertPeopleDocuments();
+        indexTweeterDocument(indexType2, 1);
+        indexBulkDocuments(indexName, bulkIndexType, 10);
+
+        // The refresh API allows to explicitly refresh one or more index,
+        // making all operations performed since the last refresh available for
+        // search
+        
embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet();
+        dataContext = new ElasticSearchDataContext(client, indexName);
+        System.out.println("Embedded ElasticSearch server created!");
+    }
+
+    private static void insertPeopleDocuments() throws IOException {
+        indexOnePeopleDocument("female", 20, 5);
+        indexOnePeopleDocument("female", 17, 8);
+        indexOnePeopleDocument("female", 18, 9);
+        indexOnePeopleDocument("female", 19, 10);
+        indexOnePeopleDocument("female", 20, 11);
+        indexOnePeopleDocument("male", 19, 1);
+        indexOnePeopleDocument("male", 17, 2);
+        indexOnePeopleDocument("male", 18, 3);
+        indexOnePeopleDocument("male", 18, 4);
+    }
+
+    @AfterClass
+    public static void afterTests() {
+        embeddedElasticsearchServer.shutdown();
+        System.out.println("Embedded ElasticSearch server shut down!");
+    }
+
+    @Test
+    public void testSimpleQuery() throws Exception {
+        assertEquals("[bulktype, peopletype, tweet1, tweet2]",
+                
Arrays.toString(dataContext.getDefaultSchema().getTableNames()));
+
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+
+        assertEquals("[_id, message, postDate, user]", 
Arrays.toString(table.getColumnNames()));
+
+        assertEquals(ColumnType.STRING, 
table.getColumnByName("user").getType());
+        assertEquals(ColumnType.DATE, 
table.getColumnByName("postDate").getType());
+        assertEquals(ColumnType.BIGINT, 
table.getColumnByName("message").getType());
+
+        try(DataSet ds = 
dataContext.query().from(indexType1).select("user").and("message").execute()) {
+            assertEquals(ElasticSearchDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user1, 1]]", ds.getRow().toString());
+        }
+    }
+
+    @Test
+    public void testDocumentIdAsPrimaryKey() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet2");
+        Column[] pks = table.getPrimaryKeys();
+        assertEquals(1, pks.length);
+        assertEquals("_id", pks[0].getName());
+
+        try (DataSet ds = dataContext.query().from(table).select("user", 
"_id").orderBy("_id").asc().execute()) {
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user1, tweet_tweet2_1]]", 
ds.getRow().toString());
+        }
+    }
+
+    @Test
+    public void testExecutePrimaryKeyLookupQuery() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet2");
+        Column[] pks = table.getPrimaryKeys();
+
+        try (DataSet ds = 
dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute())
 {
+            assertTrue(ds.next());
+            Object dateValue = ds.getRow().getValue(2);
+            assertEquals("Row[values=[tweet_tweet2_1, 1, " + dateValue + ", 
user1]]", ds.getRow().toString());
+
+            assertFalse(ds.next());
+
+            assertEquals(InMemoryDataSet.class, ds.getClass());
+        }
+    }
+
+    @Test
+    public void testDateIsHandledAsDate() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+        Column column = table.getColumnByName("postDate");
+        ColumnType type = column.getType();
+        assertEquals(ColumnType.DATE, type);
+
+        DataSet dataSet = 
dataContext.query().from(table).select(column).execute();
+        while (dataSet.next()) {
+            Object value = dataSet.getRow().getValue(column);
+            assertTrue("Got class: " + value.getClass() + ", expected Date (or 
subclass)", value instanceof Date);
+        }
+    }
+
+    @Test
+    public void testNumberIsHandledAsNumber() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+        Column column = table.getColumnByName("age");
+        ColumnType type = column.getType();
+        assertEquals(ColumnType.BIGINT, type);
+
+        DataSet dataSet = 
dataContext.query().from(table).select(column).execute();
+        while (dataSet.next()) {
+            Object value = dataSet.getRow().getValue(column);
+            assertTrue("Got class: " + value.getClass() + ", expected Number 
(or subclass)", value instanceof Number);
+        }
+    }
+
+    @Test
+    public void testCreateTableInsertQueryAndDrop() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+        assertEquals("[" + ElasticSearchUtils.FIELD_ID + ", foo, bar]", 
Arrays.toString(table.getColumnNames()));
+
+        final Column fooColumn = table.getColumnByName("foo");
+        final Column idColumn = table.getPrimaryKeys()[0];
+        
assertEquals("Column[name=_id,columnNumber=0,type=STRING,nullable=null,nativeType=null,columnSize=null]",
+                idColumn.toString());
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        dataContext.refreshSchemas();
+
+        try (DataSet ds = 
dataContext.query().from(table).selectAll().orderBy("bar").execute()) {
+            assertTrue(ds.next());
+            assertEquals("hello", ds.getRow().getValue(fooColumn).toString());
+            assertNotNull(ds.getRow().getValue(idColumn));
+            assertTrue(ds.next());
+            assertEquals("world", ds.getRow().getValue(fooColumn).toString());
+            assertNotNull(ds.getRow().getValue(idColumn));
+            assertFalse(ds.next());
+        }
+
+        dataContext.executeUpdate(new DropTable(table));
+
+        dataContext.refreshSchemas();
+
+        assertNull(dataContext.getTableByQualifiedLabel(table.getName()));
+    }
+
+    @Test
+    public void testDetectOutsideChanges() throws Exception {
+        ElasticSearchDataContext elasticSearchDataContext = 
(ElasticSearchDataContext) dataContext;
+
+        // Create the type in ES
+        final IndicesAdminClient indicesAdmin = 
elasticSearchDataContext.getElasticSearchClient().admin().indices();
+        final String tableType = "outsideTable";
+
+        Object[] sourceProperties = { "testA", "type=string, store=true", 
"testB", "type=string, store=true" };
+
+        new 
PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties)
+                .execute().actionGet();
+
+        dataContext.refreshSchemas();
+
+        
assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType));
+
+        new 
DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet();
+        dataContext.refreshSchemas();
+        assertNull(dataContext.getTableByQualifiedLabel(tableType));
+    }
+
+    @Test
+    public void testDeleteAll() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        dataContext.executeUpdate(new DeleteFrom(table));
+
+        Row row = MetaModelHelper.executeSingleRowQuery(dataContext, 
dataContext.query().from(table).selectCount()
+                .toQuery());
+        assertEquals("Row[values=[0]]", row.toString());
+
+        dataContext.executeUpdate(new DropTable(table));
+    }
+
+    @Test
+    public void testDeleteByQuery() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        dataContext.executeUpdate(new 
DeleteFrom(table).where("foo").eq("hello").where("bar").eq(42));
+
+        Row row = MetaModelHelper.executeSingleRowQuery(dataContext,
+                dataContext.query().from(table).select("foo", 
"bar").toQuery());
+        assertEquals("Row[values=[world, 43]]", row.toString());
+
+        dataContext.executeUpdate(new DropTable(table));
+    }
+
+    @Test
+    public void testDeleteUnsupportedQueryType() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+        try {
+
+            dataContext.executeUpdate(new UpdateScript() {
+                @Override
+                public void run(UpdateCallback callback) {
+                    callback.insertInto(table).value("foo", 
"hello").value("bar", 42).execute();
+                    callback.insertInto(table).value("foo", 
"world").value("bar", 43).execute();
+                }
+            });
+
+            // greater than is not yet supported
+            try {
+                dataContext.executeUpdate(new 
DeleteFrom(table).where("bar").gt(40));
+                fail("Exception expected");
+            } catch (UnsupportedOperationException e) {
+                assertEquals("Could not push down WHERE items to delete by 
query request: [testCreateTable.bar > 40]",
+                        e.getMessage());
+            }
+
+        } finally {
+            dataContext.executeUpdate(new DropTable(table));
+        }
+    }
+
+    @Test
+    public void testUpdateRow() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+        try {
+
+            dataContext.executeUpdate(new UpdateScript() {
+                @Override
+                public void run(UpdateCallback callback) {
+                    callback.insertInto(table).value("foo", 
"hello").value("bar", 42).execute();
+                    callback.insertInto(table).value("foo", 
"world").value("bar", 43).execute();
+                }
+            });
+
+            dataContext.executeUpdate(new Update(table).value("foo", 
"howdy").where("bar").eq(42));
+
+            DataSet dataSet = dataContext.query().from(table).select("foo", 
"bar").orderBy("bar").execute();
+            assertTrue(dataSet.next());
+            assertEquals("Row[values=[howdy, 42]]", 
dataSet.getRow().toString());
+            assertTrue(dataSet.next());
+            assertEquals("Row[values=[world, 43]]", 
dataSet.getRow().toString());
+            assertFalse(dataSet.next());
+            dataSet.close();
+        } finally {
+            dataContext.executeUpdate(new DropTable(table));
+        }
+    }
+
+    @Test
+    public void testDropTable() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+
+        // assert that the table was there to begin with
+        {
+            DataSet ds = 
dataContext.query().from(table).selectCount().execute();
+            ds.next();
+            assertEquals("Row[values=[9]]", ds.getRow().toString());
+            ds.close();
+        }
+
+        dataContext.executeUpdate(new DropTable(table));
+        try {
+            DataSet ds = 
dataContext.query().from(table).selectCount().execute();
+            ds.next();
+            assertEquals("Row[values=[0]]", ds.getRow().toString());
+            ds.close();
+        } finally {
+            // restore the people documents for the next tests
+            insertPeopleDocuments();
+            client.admin().indices().prepareRefresh().execute().actionGet();
+            dataContext = new ElasticSearchDataContext(client, indexName);
+        }
+    }
+
+    @Test
+    public void testWhereColumnEqualsValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .isEquals("user4").execute()) {
+            assertEquals(ElasticSearchDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnIsNullValues() throws Exception {
+        try(DataSet ds = 
dataContext.query().from(indexType2).select("message").where("postDate")
+                .isNull().execute()){
+            assertEquals(ElasticSearchDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[2]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnIsNotNullValues() throws Exception {
+        try(DataSet ds = 
dataContext.query().from(indexType2).select("message").where("postDate")
+                .isNotNull().execute()){
+            assertEquals(ElasticSearchDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[1]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereMultiColumnsEqualValues() throws Exception {
+        try(DataSet ds = 
dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .isEquals("user4").and("message").ne(5).execute()){
+            assertEquals(ElasticSearchDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnInValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .in("user4", "user5").orderBy("message").execute()) {
+            assertTrue(ds.next());
+
+            String row1 = ds.getRow().toString();
+            assertEquals("Row[values=[user4, 4]]", row1);
+            assertTrue(ds.next());
+
+            String row2 = ds.getRow().toString();
+            assertEquals("Row[values=[user5, 5]]", row2);
+
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testGroupByQuery() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+
+        Query q = new Query();
+        q.from(table);
+        q.groupBy(table.getColumnByName("gender"));
+        q.select(new SelectItem(table.getColumnByName("gender")),
+                new SelectItem(FunctionType.MAX, table.getColumnByName("age")),
+                new SelectItem(FunctionType.MIN, 
table.getColumnByName("age")), new SelectItem(FunctionType.COUNT, "*",
+                        "total"), new SelectItem(FunctionType.MIN, 
table.getColumnByName("id")).setAlias("firstId"));
+        q.orderBy("gender");
+        DataSet data = dataContext.executeQuery(q);
+        assertEquals(
+                "[peopletype.gender, MAX(peopletype.age), MIN(peopletype.age), 
COUNT(*) AS total, MIN(peopletype.id) AS firstId]",
+                Arrays.toString(data.getSelectItems()));
+
+        assertTrue(data.next());
+        assertEquals("Row[values=[female, 20, 17, 5, 5]]", 
data.getRow().toString());
+        assertTrue(data.next());
+        assertEquals("Row[values=[male, 19, 17, 4, 1]]", 
data.getRow().toString());
+        assertFalse(data.next());
+    }
+
+    @Test
+    public void testFilterOnNumberColumn() {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(bulkIndexType);
+        Query q = 
dataContext.query().from(table).select("user").where("message").greaterThan(7).toQuery();
+        DataSet data = dataContext.executeQuery(q);
+        String[] expectations = new String[] { "Row[values=[user8]]", 
"Row[values=[user9]]" };
+
+        assertTrue(data.next());
+        
assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+        assertTrue(data.next());
+        
assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+        assertFalse(data.next());
+    }
+
+    @Test
+    public void testMaxRows() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+        Query query = new 
Query().from(table).select(table.getColumns()).setMaxRows(5);
+        DataSet dataSet = dataContext.executeQuery(query);
+
+        TableModel tableModel = new DataSetTableModel(dataSet);
+        assertEquals(5, tableModel.getRowCount());
+    }
+
+    @Test
+    public void testCountQuery() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(bulkIndexType);
+        Query q = new Query().selectCount().from(table);
+
+        List<Object[]> data = dataContext.executeQuery(q).toObjectArrays();
+        assertEquals(1, data.size());
+        Object[] row = data.get(0);
+        assertEquals(1, row.length);
+        assertEquals("[10]", Arrays.toString(row));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testQueryForANonExistingTable() throws Exception {
+        
dataContext.query().from("nonExistingTable").select("user").and("message").execute();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testQueryForAnExistingTableAndNonExistingField() throws 
Exception {
+        indexTweeterDocument(indexType1, 1);
+        
dataContext.query().from(indexType1).select("nonExistingField").execute();
+    }
+
+    @Test
+    public void testNonDynamicMapingTableNames() throws Exception {
+        createIndex();
+
+        ElasticSearchDataContext dataContext2 = new 
ElasticSearchDataContext(client, indexName2);
+
+        assertEquals("[tweet3]", 
Arrays.toString(dataContext2.getDefaultSchema().getTableNames()));
+    }
+
+    private static void createIndex() {
+        CreateIndexRequest cir = new CreateIndexRequest(indexName2);
+        CreateIndexResponse response = 
client.admin().indices().create(cir).actionGet();
+
+        System.out.println("create index: " + response.isAcknowledged());
+
+        PutMappingRequest pmr = new 
PutMappingRequest(indexName2).type(indexType3).source(mapping);
+
+        PutMappingResponse response2 = 
client.admin().indices().putMapping(pmr).actionGet();
+        System.out.println("put mapping: " + response2.isAcknowledged());
+    }
+
+    private static void indexBulkDocuments(String indexName, String indexType, 
int numberOfDocuments) {
+        BulkRequestBuilder bulkRequest = client.prepareBulk();
+
+        for (int i = 0; i < numberOfDocuments; i++) {
+            bulkRequest.add(client.prepareIndex(indexName, indexType, 
Integer.toString(i)).setSource(
+                    buildTweeterJson(i)));
+        }
+        bulkRequest.execute().actionGet();
+    }
+
+    private static void indexTweeterDocument(String indexType, int id, Date 
date) {
+        client.prepareIndex(indexName, 
indexType).setSource(buildTweeterJson(id, date))
+                .setId("tweet_" + indexType + "_" + id).execute().actionGet();
+    }
+
+    private static void indexTweeterDocument(String indexType, int id) {
+        client.prepareIndex(indexName, 
indexType).setSource(buildTweeterJson(id))
+                .setId("tweet_" + indexType + "_" + id).execute().actionGet();
+    }
+
+    private static void indexOnePeopleDocument(String gender, int age, int id) 
throws IOException {
+        client.prepareIndex(indexName, 
peopleIndexType).setSource(buildPeopleJson(gender, age, id)).execute()
+                .actionGet();
+    }
+
+    private static Map<String, Object> buildTweeterJson(int elementId) {
+        return buildTweeterJson(elementId, new Date());
+    }
+
+    private static Map<String, Object> buildTweeterJson(int elementId, Date 
date) {
+        Map<String, Object> map = new LinkedHashMap<>();
+        map.put("user", "user" + elementId);
+        map.put("postDate", date);
+        map.put("message", elementId);
+        return map;
+    }
+
+    private static XContentBuilder buildPeopleJson(String gender, int age, int 
elementId) throws IOException {
+        return jsonBuilder().startObject().field("gender", 
gender).field("age", age).field("id", elementId).endObject();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
 
b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
new file mode 100644
index 0000000..8b5eb50
--- /dev/null
+++ 
b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.schema.ColumnType;
+import org.elasticsearch.common.collect.MapBuilder;
+
+public class ElasticSearchMetaDataParserTest extends TestCase {
+
+    public void testParseMetadataInfo() throws Exception {
+        Map<String, Object> metadata = new LinkedHashMap<>();
+        metadata.put("message", MapBuilder.newMapBuilder().put("type", 
"long").immutableMap());
+        metadata.put("postDate", MapBuilder.newMapBuilder().put("type", 
"date").put("format", "dateOptionalTime").immutableMap());
+        metadata.put("anotherDate", MapBuilder.newMapBuilder().put("type", 
"date").put("format", "dateOptionalTime").immutableMap());
+        metadata.put("user", MapBuilder.newMapBuilder().put("type", 
"string").immutableMap());
+        metadata.put("critical", MapBuilder.newMapBuilder().put("type", 
"boolean").immutableMap());
+        metadata.put("income", MapBuilder.newMapBuilder().put("type", 
"double").immutableMap());
+        metadata.put("untypedthingie", MapBuilder.newMapBuilder().put("foo", 
"bar").immutableMap());
+        
+        ElasticSearchMetaData metaData = 
ElasticSearchMetaDataParser.parse(metadata);
+        String[] columnNames = metaData.getColumnNames();
+        ColumnType[] columnTypes = metaData.getColumnTypes();
+
+        assertTrue(columnNames.length == 8);
+        assertEquals(columnNames[0], "_id");
+        assertEquals(columnNames[1], "message");
+        assertEquals(columnNames[2], "postDate");
+        assertEquals(columnNames[3], "anotherDate");
+        assertEquals(columnNames[4], "user");
+        assertEquals(columnNames[5], "critical");
+        assertEquals(columnNames[6], "income");
+        assertEquals(columnNames[7], "untypedthingie");
+        
+        assertTrue(columnTypes.length == 8);
+        assertEquals(columnTypes[0], ColumnType.STRING);
+        assertEquals(columnTypes[1], ColumnType.BIGINT);
+        assertEquals(columnTypes[2], ColumnType.DATE);
+        assertEquals(columnTypes[3], ColumnType.DATE);
+        assertEquals(columnTypes[4], ColumnType.STRING);
+        assertEquals(columnTypes[5], ColumnType.BOOLEAN);
+        assertEquals(columnTypes[6], ColumnType.DOUBLE);
+        assertEquals(columnTypes[7], ColumnType.STRING);
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
 
b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
new file mode 100644
index 0000000..9ffc6b8
--- /dev/null
+++ 
b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import junit.framework.TestCase;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.MutableColumn;
+
+import java.util.*;
+
+public class ElasticSearchUtilsTest extends TestCase {
+
+    public void testAssignDocumentIdForPrimaryKeys() throws Exception {
+        MutableColumn primaryKeyColumn = new MutableColumn("value1", 
ColumnType.STRING).setPrimaryKey(true);
+        SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn);
+        List<SelectItem> selectItems1 = 
Collections.singletonList(primaryKeyItem);
+        String documentId = "doc1";
+        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
+        Map<String, Object> values = new HashMap<>();
+        values.put("value1", "theValue");
+        Row row = NativeElasticSearchUtils.createRow(values, documentId, 
header);
+        String primaryKeyValue = (String) row.getValue(primaryKeyItem);
+
+        assertEquals(primaryKeyValue, documentId);
+    }
+
+    public void testCreateRowWithParsableDates() throws Exception {
+        SelectItem item1 = new SelectItem(new MutableColumn("value1", 
ColumnType.STRING));
+        SelectItem item2 = new SelectItem(new MutableColumn("value2", 
ColumnType.DATE));
+        List<SelectItem> selectItems1 = Arrays.asList(item1, item2);
+        String documentId = "doc1";
+        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
+        Map<String, Object> values = new HashMap<>();
+        values.put("value1", "theValue");
+        values.put("value2", "2013-01-04T15:55:51.217+01:00");
+        Row row = NativeElasticSearchUtils.createRow(values, documentId, 
header);
+        Object stringValue = row.getValue(item1);
+        Object dateValue = row.getValue(item2);
+
+        assertTrue(stringValue instanceof String);
+        assertTrue(dateValue instanceof Date);
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
 
b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
new file mode 100644
index 0000000..b94d0ab
--- /dev/null
+++ 
b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.node.Node;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+public class EmbeddedElasticsearchServer {
+
+    private static final String DEFAULT_DATA_DIRECTORY = 
"target/elasticsearch-data";
+
+    private final Node node;
+    private final String dataDirectory;
+
+    public EmbeddedElasticsearchServer() {
+        this(DEFAULT_DATA_DIRECTORY);
+    }
+
+    public EmbeddedElasticsearchServer(String dataDirectory) {
+        this.dataDirectory = dataDirectory;
+
+        ImmutableSettings.Builder elasticsearchSettings = 
ImmutableSettings.settingsBuilder()
+                .put("http.enabled", "true")
+                .put("path.data", dataDirectory);
+
+        node = nodeBuilder()
+                .local(true)
+                .settings(elasticsearchSettings.build())
+                .node();
+    }
+
+    public Client getClient() {
+        return node.client();
+    }
+
+    public void shutdown() {
+        node.close();
+        deleteDataDirectory();
+    }
+
+    private void deleteDataDirectory() {
+        try {
+            FileUtils.deleteDirectory(new File(dataDirectory));
+        } catch (IOException e) {
+            throw new RuntimeException("Could not delete data directory of 
embedded elasticsearch server", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index 275b75f..e3ccfb5 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -17,41 +17,16 @@
        </parent>
        <modelVersion>4.0.0</modelVersion>
        <artifactId>MetaModel-elasticsearch</artifactId>
-       <name>MetaModel module for Elasticsearch analytics engine</name>
+       <packaging>pom</packaging>
+       <name>MetaModel module for Elasticsearch</name>
 
        <properties>
                <elasticsearch.version>1.4.4</elasticsearch.version>
        </properties>
 
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.metamodel</groupId>
-                       <artifactId>MetaModel-core</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-               <dependency>
-                       <groupId>commons-io</groupId>
-                       <artifactId>commons-io</artifactId>
-               </dependency>
-               
-               <!-- elasticsearch -->
-               <dependency>
-                       <groupId>org.elasticsearch</groupId>
-                       <artifactId>elasticsearch</artifactId>
-                       <version>${elasticsearch.version}</version>
-               </dependency>
-               
-               <!-- test -->
-               <dependency>
-                       <groupId>org.slf4j</groupId>
-                       <artifactId>slf4j-log4j12</artifactId>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>junit</groupId>
-                       <artifactId>junit</artifactId>
-                       <scope>test</scope>
-               </dependency>
-       </dependencies>
-
+       <modules>
+               <module>common</module>
+               <module>native</module>
+               <module>rest</module>
+       </modules>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/pom.xml b/elasticsearch/rest/pom.xml
new file mode 100644
index 0000000..e4a4a79
--- /dev/null
+++ b/elasticsearch/rest/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>MetaModel-elasticsearch</artifactId>
+               <groupId>org.apache.metamodel</groupId>
+               <version>4.4.2-SNAPSHOT</version>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <properties>
+               <jest.version>0.1.7</jest.version>
+               <elasticsearch.version>1.4.4</elasticsearch.version>
+       </properties>
+
+       <artifactId>MetaModel-elasticsearch-rest</artifactId>
+       <name>MetaModel module for ElasticSearch via REST client</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.metamodel</groupId>
+                       <artifactId>MetaModel-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.metamodel</groupId>
+                       <artifactId>MetaModel-elasticsearch-common</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>commons-io</groupId>
+                       <artifactId>commons-io</artifactId>
+               </dependency>
+
+               <!-- Jest -->
+               <dependency>
+                       <groupId>io.searchbox</groupId>
+                       <artifactId>jest</artifactId>
+                       <version>${jest.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>commons-logging</groupId>
+                                       <artifactId>commons-logging</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>jcl-over-slf4j</artifactId>
+               </dependency>
+               <!-- elasticsearch -->
+               <dependency>
+                       <groupId>org.elasticsearch</groupId>
+                       <artifactId>elasticsearch</artifactId>
+                       <version>${elasticsearch.version}</version>
+               </dependency>
+
+               <!-- test -->
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-log4j12</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
new file mode 100644
index 0000000..2219b89
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.UpdateScript;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.MutableColumn;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import io.searchbox.client.JestClient;
+import io.searchbox.client.JestResult;
+import io.searchbox.core.Count;
+import io.searchbox.core.CountResult;
+import io.searchbox.core.Get;
+import io.searchbox.core.Search;
+import io.searchbox.core.SearchResult;
+import io.searchbox.indices.mapping.GetMapping;
+import io.searchbox.params.Parameters;
+
+/**
+ * DataContext implementation for ElasticSearch analytics engine.
+ *
+ * ElasticSearch has a data storage structure hierarchy that briefly goes like
+ * this:
+ * <ul>
+ * <li>Index</li>
+ * <li>Document type (short: Type) (within an index)</li>
+ * <li>Documents (of a particular type)</li>
+ * </ul>
+ *
+ * When instantiating this DataContext, an index name is provided. Within this
+ * index, each document type is represented as a table.
+ *
+ * This implementation supports either automatic discovery of a schema or 
manual
+ * specification of a schema, through the {@link SimpleTableDef} class.
+ */
+public class ElasticSearchRestDataContext extends QueryPostprocessDataContext 
implements DataContext, UpdateableDataContext {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchRestDataContext.class);
+
+    public static final String FIELD_ID = "_id";
+
+    // 1 minute timeout
+    public static final String TIMEOUT_SCROLL = "1m";
+
+    private final JestClient elasticSearchClient;
+
+    private final String indexName;
+    // Table definitions that are set from the beginning, not supposed to be 
changed.
+    private final List<SimpleTableDef> staticTableDefinitions;
+
+    // Table definitions that are discovered, these can change
+    private final List<SimpleTableDef> dynamicTableDefinitions = new 
ArrayList<>();
+
+    /**
+     * Constructs a {@link ElasticSearchRestDataContext}. This constructor 
accepts a
+     * custom array of {@link SimpleTableDef}s which allows the user to define
+     * his own view on the indexes in the engine.
+     *
+     * @param client
+     *            the ElasticSearch client
+     * @param indexName
+     *            the name of the ElasticSearch index to represent
+     * @param tableDefinitions
+     *            an array of {@link SimpleTableDef}s, which define the table
+     *            and column model of the ElasticSearch index.
+     */
+    public ElasticSearchRestDataContext(JestClient client, String indexName, 
SimpleTableDef... tableDefinitions) {
+        if (client == null) {
+            throw new IllegalArgumentException("ElasticSearch Client cannot be 
null");
+        }
+        if (indexName == null || indexName.trim().length() == 0) {
+            throw new IllegalArgumentException("Invalid ElasticSearch Index 
name: " + indexName);
+        }
+        this.elasticSearchClient = client;
+        this.indexName = indexName;
+        this.staticTableDefinitions = Arrays.asList(tableDefinitions);
+        this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
+    }
+
+    /**
+     * Constructs a {@link ElasticSearchRestDataContext} and automatically 
detects
+     * the schema structure/view on all indexes (see
+     * {@link this.detectSchema(JestClient, String)}).
+     *
+     * @param client
+     *            the ElasticSearch client
+     * @param indexName
+     *            the name of the ElasticSearch index to represent
+     */
+    public ElasticSearchRestDataContext(JestClient client, String indexName) {
+        this(client, indexName, new SimpleTableDef[0]);
+    }
+
+    /**
+     * Performs an analysis of the available indexes in an ElasticSearch 
cluster
+     * {@link JestClient} instance and detects the elasticsearch types 
structure
+     * based on the metadata provided by the ElasticSearch java client.
+     *
+     * @see #detectTable(JsonObject, String)
+     * @return a mutable schema instance, useful for further fine tuning by the
+     *         user.
+     */
+    private SimpleTableDef[] detectSchema() {
+        logger.info("Detecting schema for index '{}'", indexName);
+
+        final JestResult jestResult;
+        try {
+            final GetMapping getMapping = new 
GetMapping.Builder().addIndex(indexName).build();
+            jestResult = elasticSearchClient.execute(getMapping);
+        } catch (Exception e) {
+            logger.error("Failed to retrieve mappings" , e);
+            throw new MetaModelException("Failed to execute request for index 
information needed to detect schema", e);
+        }
+
+        if(!jestResult.isSucceeded()){
+            logger.error("Failed to retrieve mappings; {}", 
jestResult.getErrorMessage());
+            throw new MetaModelException("Failed to retrieve mappings; " + 
jestResult.getErrorMessage());
+        }
+
+        final List<SimpleTableDef> result = new ArrayList<>();
+
+        final Set<Map.Entry<String, JsonElement>> mappings =
+                
jestResult.getJsonObject().getAsJsonObject(indexName).getAsJsonObject("mappings").entrySet();
+        if(mappings.size() == 0){
+            logger.warn("No metadata returned for index name '{}' - no tables 
will be detected.");
+        } else {
+
+            for (Map.Entry<String, JsonElement> entry : mappings) {
+                final String documentType = entry.getKey();
+
+                try {
+                    final SimpleTableDef table = 
detectTable(entry.getValue().getAsJsonObject().get("properties").getAsJsonObject(),
 documentType);
+                    result.add(table);
+                } catch (Exception e) {
+                    logger.error("Unexpected error during detectTable for 
document type '{}'", documentType, e);
+                }
+            }
+        }
+        final SimpleTableDef[] tableDefArray = result.toArray(new 
SimpleTableDef[result.size()]);
+        Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
+            @Override
+            public int compare(SimpleTableDef o1, SimpleTableDef o2) {
+                return o1.getName().compareTo(o2.getName());
+            }
+        });
+
+        return tableDefArray;
+    }
+
+    /**
+     * Performs an analysis of an available index type in an ElasticSearch
+     * {@link JestClient} client and tries to detect the index structure based 
on
+     * the metadata provided by the java client.
+     *
+     * @param metadataProperties
+     *            the ElasticSearch mapping
+     * @param documentType
+     *            the name of the index type
+     * @return a table definition for ElasticSearch.
+     */
+    private static SimpleTableDef detectTable(JsonObject metadataProperties, 
String documentType) {
+        final ElasticSearchMetaData metaData = 
JestElasticSearchMetaDataParser.parse(metadataProperties);
+        return new SimpleTableDef(documentType, metaData.getColumnNames(),
+                metaData.getColumnTypes());
+    }
+
+    @Override
+    protected Schema getMainSchema() throws MetaModelException {
+        final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
+        for (final SimpleTableDef tableDef : staticTableDefinitions) {
+            addTable(theSchema, tableDef);
+        }
+
+        final SimpleTableDef[] tables = detectSchema();
+        synchronized (this) {
+            dynamicTableDefinitions.clear();
+            dynamicTableDefinitions.addAll(Arrays.asList(tables));
+            for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
+                final List<String> tableNames = 
Arrays.asList(theSchema.getTableNames());
+
+                if (!tableNames.contains(tableDef.getName())) {
+                    addTable(theSchema, tableDef);
+                }
+            }
+        }
+
+        return theSchema;
+    }
+
+    private void addTable(final MutableSchema theSchema, final SimpleTableDef 
tableDef) {
+        final MutableTable table = tableDef.toTable().setSchema(theSchema);
+        final Column idColumn = table.getColumnByName(FIELD_ID);
+        if (idColumn != null && idColumn instanceof MutableColumn) {
+            final MutableColumn mutableColumn = (MutableColumn) idColumn;
+            mutableColumn.setPrimaryKey(true);
+        }
+        theSchema.addTable(table);
+    }
+
+    @Override
+    protected String getMainSchemaName() throws MetaModelException {
+        return indexName;
+    }
+
+    @Override
+    protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> 
selectItems,
+            List<FilterItem> whereItems, int firstRow, int maxRows) {
+        final QueryBuilder queryBuilder = ElasticSearchUtils
+                .createQueryBuilderForSimpleWhere(whereItems, 
LogicalOperator.AND);
+        if (queryBuilder != null) {
+            // where clause can be pushed down to an ElasticSearch query
+            SearchSourceBuilder searchSourceBuilder = 
createSearchRequest(firstRow, maxRows, queryBuilder);
+            SearchResult result = executeSearch(table, searchSourceBuilder, 
false);
+
+            return new JestElasticSearchDataSet(elasticSearchClient, result, 
selectItems);
+        }
+        return super.materializeMainSchemaTable(table, selectItems, 
whereItems, firstRow, maxRows);
+    }
+
+    private SearchResult executeSearch(Table table, SearchSourceBuilder 
searchSourceBuilder, boolean scroll) {
+        Search.Builder builder = new 
Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType(table.getName());
+        if(scroll){
+            builder.setParameter(Parameters.SCROLL, TIMEOUT_SCROLL);
+        }
+
+        Search search = builder.build();
+        SearchResult result;
+        try {
+            result = elasticSearchClient.execute(search);
+        } catch (Exception e){
+            logger.warn("Could not execute ElasticSearch query", e);
+            throw new MetaModelException("Could not execute ElasticSearch 
query", e);
+        }
+        return result;
+    }
+
+    @Override
+    protected DataSet materializeMainSchemaTable(Table table, Column[] 
columns, int maxRows) {
+        SearchResult searchResult = executeSearch(table, 
createSearchRequest(1, maxRows, null), limitMaxRowsIsSet(maxRows));
+
+        return new JestElasticSearchDataSet(elasticSearchClient, searchResult, 
columns);
+    }
+
+    private SearchSourceBuilder createSearchRequest(int firstRow, int maxRows, 
QueryBuilder queryBuilder) {
+        final SearchSourceBuilder searchRequest = new SearchSourceBuilder();
+        if (firstRow > 1) {
+            final int zeroBasedFrom = firstRow - 1;
+            searchRequest.from(zeroBasedFrom);
+        }
+        if (limitMaxRowsIsSet(maxRows)) {
+            searchRequest.size(maxRows);
+        }
+
+        if (queryBuilder != null) {
+            searchRequest.query(queryBuilder);
+        }
+
+        return searchRequest;
+    }
+
+    @Override
+    protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> 
selectItems, Column primaryKeyColumn,
+            Object keyValue) {
+        if (keyValue == null) {
+            return null;
+        }
+
+        final String documentType = table.getName();
+        final String id = keyValue.toString();
+
+        final Get get = new Get.Builder(indexName, 
id).type(documentType).build();
+        final JestResult getResult = 
JestClientExecutor.execute(elasticSearchClient, get);
+
+        final DataSetHeader header = new SimpleDataSetHeader(selectItems);
+
+        return 
JestElasticSearchUtils.createRow(getResult.getJsonObject().get("_source").getAsJsonObject(),
 id, header);
+    }
+
+    @Override
+    protected Number executeCountQuery(Table table, List<FilterItem> 
whereItems, boolean functionApproximationAllowed) {
+        if (!whereItems.isEmpty()) {
+            // not supported - will have to be done by counting client-side
+            return null;
+        }
+        final String documentType = table.getName();
+        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+        sourceBuilder.query(QueryBuilders.termQuery("_type", documentType));
+
+        Count count = new 
Count.Builder().addIndex(indexName).query(sourceBuilder.toString()).build();
+
+        CountResult countResult;
+        try {
+            countResult = elasticSearchClient.execute(count);
+        } catch (Exception e){
+            logger.warn("Could not execute ElasticSearch get query", e);
+            throw new MetaModelException("Could not execute ElasticSearch get 
query", e);
+        }
+
+        return countResult.getCount();
+    }
+
+    private boolean limitMaxRowsIsSet(int maxRows) {
+        return (maxRows != -1);
+    }
+
+    @Override
+    public void executeUpdate(UpdateScript update) {
+        final JestElasticSearchUpdateCallback callback = new 
JestElasticSearchUpdateCallback(this);
+        update.run(callback);
+        callback.onExecuteUpdateFinished();
+    }
+
+    /**
+     * Gets the {@link JestClient} that this {@link DataContext} is wrapping.
+     */
+    public JestClient getElasticSearchClient() {
+        return elasticSearchClient;
+    }
+
+    /**
+     * Gets the name of the index that this {@link DataContext} is working on.
+     */
+    public String getIndexName() {
+        return indexName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
new file mode 100644
index 0000000..1bb026d
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import io.searchbox.action.Action;
+import io.searchbox.client.JestClient;
+import io.searchbox.client.JestResult;
+import org.apache.metamodel.MetaModelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+final class JestClientExecutor {
+    private static final Logger logger = 
LoggerFactory.getLogger(JestClientExecutor.class);
+
+    static <T extends JestResult> T execute(JestClient jestClient, Action<T> 
clientRequest) {
+        return execute(jestClient, clientRequest, true);
+    }
+
+    static <T extends JestResult> T execute(JestClient jestClient, Action<T> 
clientRequest, boolean doThrow) {
+        try {
+            final T result = jestClient.execute(clientRequest);
+            logger.debug("{} response: acknowledged={}", clientRequest, 
result.isSucceeded());
+            return result;
+        } catch (IOException e) {
+            logger.warn("Could not execute command {} ", clientRequest, e);
+            if (doThrow) {
+                throw new MetaModelException("Could not execute command " + 
clientRequest, e);
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
new file mode 100644
index 0000000..cc42b07
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import io.searchbox.action.GenericResultAbstractAction;
+
+public class JestDeleteScroll extends GenericResultAbstractAction {
+    private JestDeleteScroll(Builder builder) {
+        super(builder);
+        this.payload = builder.getScrollId();
+        setURI(buildURI());
+    }
+
+    @Override
+    public String getRestMethodName() {
+        return "DELETE";
+    }
+
+    @Override
+    protected String buildURI() {
+        return super.buildURI() + "/_search/scroll";
+    }
+
+    public static class Builder extends 
GenericResultAbstractAction.Builder<JestDeleteScroll, Builder> {
+        private final String scrollId;
+
+        public Builder(String scrollId) {
+            this.scrollId = scrollId;
+        }
+
+        @Override
+        public JestDeleteScroll build() {
+            return new JestDeleteScroll(this);
+        }
+
+        public String getScrollId() {
+            return scrollId;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
new file mode 100644
index 0000000..cc26c8d
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import io.searchbox.indices.mapping.PutMapping;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.create.AbstractTableCreationBuilder;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.*;
+
+import java.util.List;
+
+final class JestElasticSearchCreateTableBuilder extends 
AbstractTableCreationBuilder<JestElasticSearchUpdateCallback> {
+    public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback 
updateCallback, Schema schema, String name) {
+        super(updateCallback, schema, name);
+    }
+
+    @Override
+    public Table execute() throws MetaModelException {
+        final MutableTable table = getTable();
+        final List<Object> sourceProperties = 
ElasticSearchUtils.getSourceProperties(table);
+
+        final ElasticSearchRestDataContext dataContext = 
getUpdateCallback().getDataContext();
+        final String indexName = dataContext.getIndexName();
+
+        final PutMapping putMapping = new PutMapping.Builder(indexName, 
table.getName(), sourceProperties).build();
+        JestClientExecutor.execute(dataContext.getElasticSearchClient(), 
putMapping);
+
+        final MutableSchema schema = (MutableSchema) getSchema();
+        schema.addTable(table);
+        return table;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
new file mode 100644
index 0000000..9678b48
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import io.searchbox.client.JestClient;
+import io.searchbox.client.JestResult;
+import io.searchbox.core.SearchScroll;
+import org.apache.metamodel.data.AbstractDataSet;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * {@link DataSet} implementation for ElasticSearch
+ */
+final class JestElasticSearchDataSet extends AbstractDataSet {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JestElasticSearchDataSet.class);
+
+    private final JestClient _client;
+    private final AtomicBoolean _closed;
+
+    private JestResult _searchResponse;
+    private JsonObject _currentHit;
+    private int _hitIndex = 0;
+
+    public JestElasticSearchDataSet(JestClient client, JestResult 
searchResponse, List<SelectItem> selectItems) {
+        super(selectItems);
+        _client = client;
+        _searchResponse = searchResponse;
+        _closed = new AtomicBoolean(false);
+    }
+
+    public JestElasticSearchDataSet(JestClient client, JestResult 
searchResponse, Column[] columns) {
+        super(columns);
+        _client = client;
+        _searchResponse = searchResponse;
+        _closed = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        boolean closeNow = _closed.compareAndSet(true, false);
+        if (closeNow) {
+            final String scrollId = 
_searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id").getAsString();
+            JestClientExecutor.execute(_client, new 
JestDeleteScroll.Builder(scrollId).build(), false);
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        if (!_closed.get()) {
+            logger.warn("finalize() invoked, but DataSet is not closed. 
Invoking close() on {}", this);
+            close();
+        }
+    }
+
+    @Override
+    public boolean next() {
+        final JsonArray hits = 
_searchResponse.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
+        if (hits.size() == 0) {
+            // break condition for the scroll
+            _currentHit = null;
+            return false;
+        }
+
+        if (_hitIndex < hits.size()) {
+            // pick the next hit within this search response
+            _currentHit = hits.get(_hitIndex).getAsJsonObject();
+            _hitIndex++;
+            return true;
+        }
+
+        final JsonPrimitive scrollId = 
_searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id");
+        if (scrollId == null) {
+            // this search response is not scrollable - then it's the end.
+            _currentHit = null;
+            return false;
+        }
+
+        // try to scroll to the next set of hits
+        SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), 
ElasticSearchRestDataContext.TIMEOUT_SCROLL).build();
+
+        _searchResponse = JestClientExecutor.execute(_client, scroll);
+
+        // start over (recursively)
+        _hitIndex = 0;
+        return next();
+    }
+
+    @Override
+    public Row getRow() {
+        if (_currentHit == null) {
+            return null;
+        }
+
+        final JsonObject source = _currentHit.getAsJsonObject("_source");
+        final String documentId = _currentHit.get("_id").getAsString();
+        return JestElasticSearchUtils.createRow(source, documentId, 
getHeader());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
new file mode 100644
index 0000000..a4c0c03
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import io.searchbox.core.DeleteByQuery;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.delete.AbstractRowDeletionBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import java.util.List;
+
+/**
+ * {@link RowDeletionBuilder} implementation for
+ * {@link ElasticSearchRestDataContext}.
+ */
+final class JestElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
+    private final JestElasticSearchUpdateCallback _updateCallback;
+
+    public JestElasticSearchDeleteBuilder(JestElasticSearchUpdateCallback 
updateCallback, Table table) {
+        super(table);
+        _updateCallback = updateCallback;
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final Table table = getTable();
+        final String documentType = table.getName();
+
+        final ElasticSearchRestDataContext dataContext = 
_updateCallback.getDataContext();
+        final String indexName = dataContext.getIndexName();
+
+        final List<FilterItem> whereItems = getWhereItems();
+
+        // delete by query - note that creteQueryBuilderForSimpleWhere may
+        // return matchAllQuery() if no where items are present.
+        final QueryBuilder queryBuilder = 
ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
+                LogicalOperator.AND);
+        if (queryBuilder == null) {
+            // TODO: The where items could not be pushed down to a query. We
+            // could solve this by running a query first, gather all
+            // document IDs and then delete by IDs.
+            throw new UnsupportedOperationException("Could not push down WHERE 
items to delete by query request: "
+                    + whereItems);
+        }
+        final SearchSourceBuilder searchSourceBuilder = new 
SearchSourceBuilder();
+        searchSourceBuilder.query(queryBuilder);
+
+        final DeleteByQuery deleteByQuery =
+                new 
DeleteByQuery.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType(
+                        documentType).build();
+
+        JestClientExecutor.execute(dataContext.getElasticSearchClient(), 
deleteByQuery);
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
new file mode 100644
index 0000000..d4ddd19
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import io.searchbox.indices.mapping.DeleteMapping;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.drop.AbstractTableDropBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TableDropBuilder} for dropping tables (document types) in an
+ * ElasticSearch index.
+ */
+final class JestElasticSearchDropTableBuilder extends AbstractTableDropBuilder 
{
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JestElasticSearchDropTableBuilder.class);
+
+    private final JestElasticSearchUpdateCallback _updateCallback;
+
+    public JestElasticSearchDropTableBuilder(JestElasticSearchUpdateCallback 
updateCallback, Table table) {
+        super(table);
+        _updateCallback = updateCallback;
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+
+        final ElasticSearchRestDataContext dataContext = 
_updateCallback.getDataContext();
+        final Table table = getTable();
+        final String documentType = table.getName();
+        logger.info("Deleting mapping / document type: {}", documentType);
+
+        final DeleteMapping deleteIndex = new 
DeleteMapping.Builder(dataContext.getIndexName(), documentType).build();
+
+        JestClientExecutor.execute(dataContext.getElasticSearchClient(), 
deleteIndex);
+
+        final MutableSchema schema = (MutableSchema) table.getSchema();
+        schema.removeTable(table);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
new file mode 100644
index 0000000..327d7d3
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import io.searchbox.core.DocumentResult;
+import io.searchbox.core.Index;
+import io.searchbox.params.Parameters;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+final class JestElasticSearchInsertBuilder extends 
AbstractRowInsertionBuilder<JestElasticSearchUpdateCallback> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JestElasticSearchInsertBuilder.class);
+
+    public JestElasticSearchInsertBuilder(JestElasticSearchUpdateCallback 
updateCallback, Table table) {
+        super(updateCallback, table);
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final ElasticSearchRestDataContext dataContext = 
getUpdateCallback().getDataContext();
+        final String indexName = dataContext.getIndexName();
+        final String documentType = getTable().getName();
+
+
+        final Map<String, Object> source = new HashMap<>();
+        final Column[] columns = getColumns();
+        final Object[] values = getValues();
+        String id = null;
+        for (int i = 0; i < columns.length; i++) {
+            if (isSet(columns[i])) {
+                final String name = columns[i].getName();
+                final Object value = values[i];
+                if (ElasticSearchRestDataContext.FIELD_ID.equals(name)) {
+                    if (value != null) {
+                        id = value.toString();
+                    }
+                } else {
+                    source.put(name, value);
+                }
+            }
+        }
+
+        assert !source.isEmpty();
+
+        Index index = new 
Index.Builder(source).index(indexName).type(documentType).id(id).setParameter(
+                Parameters.OP_TYPE, "create").build();
+
+        final DocumentResult result = 
JestClientExecutor.execute(dataContext.getElasticSearchClient(), index);
+
+        logger.debug("Inserted document: id={}", result.getId());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
new file mode 100644
index 0000000..074de2e
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.ColumnType;
+
+import java.util.Map.Entry;
+
+/**
+ * Parser that transforms the ElasticSearch metadata response (json-like 
format)
+ * into an ElasticSearchMetaData object.
+ */
+final class JestElasticSearchMetaDataParser {
+
+    /**
+     * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
+     * object. This method makes much easier to create the ElasticSearch 
schema.
+     *
+     * @param metaDataInfo
+     *            ElasticSearch mapping metadata in Map format
+     * @return An ElasticSearchMetaData object
+     */
+    public static ElasticSearchMetaData parse(JsonObject metaDataInfo) {
+        final int columns = metaDataInfo.entrySet().size() + 1;
+        final String[] fieldNames = new String[columns];
+        final ColumnType[] columnTypes = new ColumnType[columns];
+
+        // add the document ID field (fixed)
+        fieldNames[0] = ElasticSearchRestDataContext.FIELD_ID;
+        columnTypes[0] = ColumnType.STRING;
+
+        int i = 1;
+        for (Entry<String, JsonElement> metaDataField : 
metaDataInfo.entrySet()) {
+            JsonElement fieldMetadata = metaDataField.getValue();
+
+            fieldNames[i] = metaDataField.getKey();
+            columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
+            i++;
+
+        }
+        return new ElasticSearchMetaData(fieldNames, columnTypes);
+    }
+
+    private static ColumnType getColumnTypeFromMetadataField(JsonElement 
fieldMetadata) {
+        final JsonElement typeElement = ((JsonObject) 
fieldMetadata).get("type");
+        if (typeElement != null) {
+            String metaDataFieldType = typeElement.getAsString();
+
+            return 
ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
+        } else {
+            return ColumnType.STRING;
+        }
+    }
+}

Reply via email to