Repository: metamodel Updated Branches: refs/heads/master 4397ac848 -> 49333c662
METAMODEL-184: Use missing and exists filters for null comparisons Fixes #48 Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/49333c66 Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/49333c66 Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/49333c66 Branch: refs/heads/master Commit: 49333c6629437c2bba589399e2ce8f83b50191d0 Parents: 4397ac8 Author: Kasper Sørensen <i.am.kasper.soren...@gmail.com> Authored: Fri Sep 4 12:44:48 2015 +0200 Committer: Kasper Sørensen <i.am.kasper.soren...@gmail.com> Committed: Fri Sep 4 12:44:48 2015 +0200 ---------------------------------------------------------------------- CHANGES.md | 1 + .../elasticsearch/ElasticSearchDataContext.java | 31 ++-- .../ElasticSearchDataContextTest.java | 164 +++++++++---------- 3 files changed, 95 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/49333c66/CHANGES.md ---------------------------------------------------------------------- diff --git a/CHANGES.md b/CHANGES.md index 65ce47d..89baa1b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,7 @@ * [METAMODEL-171] - Made integration tests for Cassandra module function properly in all environments. * [METAMODEL-177] - Fixed a bug pertaining to the serializability of HdfsResource. * [METAMODEL-172] - ElasticSearch Date types should be converted properly. + * [METAMODEL-184] - ElasticSearch querying with "IS NULL" and "IS NOT NULL" now uses MissingFilter and ExistsFilter. ### Apache MetaModel 4.3.6 http://git-wip-us.apache.org/repos/asf/metamodel/blob/49333c66/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java index ba221d1..7bdd1ab 100644 --- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java +++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java @@ -61,6 +61,7 @@ import org.elasticsearch.common.hppc.ObjectLookupContainer; import org.elasticsearch.common.hppc.cursors.ObjectCursor; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.FilterBuilders; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.slf4j.Logger; @@ -68,7 +69,7 @@ import org.slf4j.LoggerFactory; /** * DataContext implementation for ElasticSearch analytics engine. - * + * * ElasticSearch has a data storage structure hierarchy that briefly goes like * this: * <ul> @@ -76,10 +77,10 @@ import org.slf4j.LoggerFactory; * <li>Document type (short: Type) (within an index)</li> * <li>Documents (of a particular type)</li> * </ul> - * + * * When instantiating this DataContext, an index name is provided. Within this * index, each document type is represented as a table. - * + * * This implementation supports either automatic discovery of a schema or manual * specification of a schema, through the {@link SimpleTableDef} class. */ @@ -123,7 +124,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem /** * Constructs a {@link ElasticSearchDataContext} and automatically detects * the schema structure/view on all indexes (see - * {@link #detectSchema(Client)}). + * {@link #detectSchema(Client, String)}). * * @param client * the ElasticSearch client @@ -143,7 +144,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem * * @param client * the client to inspect - * @param indexName2 + * @param indexName * @return a mutable schema instance, useful for further fine tuning by the * user. */ @@ -303,7 +304,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem /** * Creates, if possible, a {@link QueryBuilder} object which can be used to * push down one or more {@link FilterItem}s to ElasticSearch's backend. - * + * * @param table * @param whereItems * @param logicalOperator @@ -341,10 +342,20 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem switch (operator) { case EQUALS_TO: - itemQueryBuilder = QueryBuilders.termQuery(fieldName, operand); + if (operand == null) { + itemQueryBuilder = + QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(fieldName)); + } else { + itemQueryBuilder = QueryBuilders.termQuery(fieldName, operand); + } break; case DIFFERENT_FROM: - itemQueryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(fieldName, operand)); + if (operand == null) { + itemQueryBuilder = + QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(fieldName)); + } else { + itemQueryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(fieldName, operand)); + } break; case IN: final List<?> operands = CollectionUtils.toList(operand); @@ -432,7 +443,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem /** * Gets the {@link Client} that this {@link DataContext} is wrapping. - * + * * @return */ public Client getElasticSearchClient() { @@ -441,7 +452,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem /** * Gets the name of the index that this {@link DataContext} is working on. - * + * * @return */ public String getIndexName() { http://git-wip-us.apache.org/repos/asf/metamodel/blob/49333c66/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java index d421b99..6390dea 100644 --- a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java +++ b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java @@ -18,9 +18,7 @@ */ package org.apache.metamodel.elasticsearch; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.junit.Assert.*; - +import java.io.IOException; import java.util.Arrays; import java.util.Date; import java.util.LinkedHashMap; @@ -49,7 +47,6 @@ import org.apache.metamodel.schema.ColumnType; import org.apache.metamodel.schema.Schema; import org.apache.metamodel.schema.Table; import org.apache.metamodel.update.Update; -import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -59,9 +56,11 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.XContentBuilder; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.*; + public class ElasticSearchDataContextTest { private static final String indexName = "twitter"; @@ -80,10 +79,11 @@ public class ElasticSearchDataContextTest { public static void beforeTests() throws Exception { embeddedElasticsearchServer = new EmbeddedElasticsearchServer(); client = embeddedElasticsearchServer.getClient(); - indexOneTweeterDocumentPerIndex(indexType1, 1); - indexOneTweeterDocumentPerIndex(indexType2, 1); + indexTweeterDocument(indexType1, 1); + indexTweeterDocument(indexType2, 1); + indexTweeterDocument(indexType2, 2, null); insertPeopleDocuments(); - indexOneTweeterDocumentPerIndex(indexType2, 1); + indexTweeterDocument(indexType2, 1); indexBulkDocuments(indexName, bulkIndexType, 10); // The refresh API allows to explicitly refresh one or more index, @@ -94,7 +94,7 @@ public class ElasticSearchDataContextTest { System.out.println("Embedded ElasticSearch server created!"); } - private static void insertPeopleDocuments() { + private static void insertPeopleDocuments() throws IOException { indexOnePeopleDocument("female", 20, 5); indexOnePeopleDocument("female", 17, 8); indexOnePeopleDocument("female", 18, 9); @@ -125,14 +125,11 @@ public class ElasticSearchDataContextTest { assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType()); assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType()); - DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute(); - assertEquals(ElasticSearchDataSet.class, ds.getClass()); + try(DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute()) { + assertEquals(ElasticSearchDataSet.class, ds.getClass()); - try { assertTrue(ds.next()); assertEquals("Row[values=[user1, 1]]", ds.getRow().toString()); - } finally { - ds.close(); } } @@ -143,12 +140,9 @@ public class ElasticSearchDataContextTest { assertEquals(1, pks.length); assertEquals("_id", pks[0].getName()); - DataSet ds = dataContext.query().from(table).select("user", "_id").orderBy("_id").asc().execute(); - try { + try (DataSet ds = dataContext.query().from(table).select("user", "_id").orderBy("_id").asc().execute()) { assertTrue(ds.next()); assertEquals("Row[values=[user1, tweet_tweet2_1]]", ds.getRow().toString()); - } finally { - ds.close(); } } @@ -157,8 +151,7 @@ public class ElasticSearchDataContextTest { Table table = dataContext.getDefaultSchema().getTableByName("tweet2"); Column[] pks = table.getPrimaryKeys(); - DataSet ds = dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute(); - try { + try (DataSet ds = dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute()) { assertTrue(ds.next()); Object dateValue = ds.getRow().getValue(2); assertEquals("Row[values=[tweet_tweet2_1, 1, " + dateValue + ", user1]]", ds.getRow().toString()); @@ -166,14 +159,10 @@ public class ElasticSearchDataContextTest { assertFalse(ds.next()); assertEquals(InMemoryDataSet.class, ds.getClass()); - } finally { - ds.close(); } } - // TODO: Un-ignore this test, and it wil fail - needs fixin' @Test - @Ignore public void testDateIsHandledAsDate() throws Exception { Table table = dataContext.getDefaultSchema().getTableByName("tweet1"); Column column = table.getColumnByName("postDate"); @@ -225,8 +214,7 @@ public class ElasticSearchDataContextTest { } }); - final DataSet ds = dataContext.query().from(table).selectAll().orderBy("bar").execute(); - try { + try (DataSet ds = dataContext.query().from(table).selectAll().orderBy("bar").execute()) { assertTrue(ds.next()); assertEquals("hello", ds.getRow().getValue(fooColumn).toString()); assertNotNull(ds.getRow().getValue(idColumn)); @@ -234,8 +222,6 @@ public class ElasticSearchDataContextTest { assertEquals("world", ds.getRow().getValue(fooColumn).toString()); assertNotNull(ds.getRow().getValue(idColumn)); assertFalse(ds.next()); - } finally { - ds.close(); } dataContext.executeUpdate(new DropTable(table)); @@ -389,40 +375,56 @@ public class ElasticSearchDataContextTest { @Test public void testWhereColumnEqualsValues() throws Exception { - DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") - .isEquals("user4").execute(); - assertEquals(ElasticSearchDataSet.class, ds.getClass()); + try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .isEquals("user4").execute()) { + assertEquals(ElasticSearchDataSet.class, ds.getClass()); - try { assertTrue(ds.next()); assertEquals("Row[values=[user4, 4]]", ds.getRow().toString()); assertFalse(ds.next()); - } finally { - ds.close(); + } + } + + @Test + public void testWhereColumnIsNullValues() throws Exception { + try(DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate") + .isNull().execute()){ + assertEquals(ElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[2]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereColumnIsNotNullValues() throws Exception { + try(DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate") + .isNotNull().execute()){ + assertEquals(ElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[1]]", ds.getRow().toString()); + assertFalse(ds.next()); } } @Test public void testWhereMultiColumnsEqualValues() throws Exception { - DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") - .isEquals("user4").and("message").ne(5).execute(); - assertEquals(ElasticSearchDataSet.class, ds.getClass()); + try(DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .isEquals("user4").and("message").ne(5).execute()){ + assertEquals(ElasticSearchDataSet.class, ds.getClass()); - try { assertTrue(ds.next()); assertEquals("Row[values=[user4, 4]]", ds.getRow().toString()); assertFalse(ds.next()); - } finally { - ds.close(); } } @Test public void testWhereColumnInValues() throws Exception { - DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") - .in("user4", "user5").orderBy("message").execute(); - - try { + try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .in("user4", "user5").orderBy("message").execute()) { assertTrue(ds.next()); String row1 = ds.getRow().toString(); @@ -433,8 +435,6 @@ public class ElasticSearchDataContextTest { assertEquals("Row[values=[user5, 5]]", row2); assertFalse(ds.next()); - } finally { - ds.close(); } } @@ -498,29 +498,15 @@ public class ElasticSearchDataContextTest { assertEquals("[10]", Arrays.toString(row)); } - @Test + @Test(expected = IllegalArgumentException.class) public void testQueryForANonExistingTable() throws Exception { - boolean thrown = false; - try { - dataContext.query().from("nonExistingTable").select("user").and("message").execute(); - } catch (IllegalArgumentException IAex) { - thrown = true; - } - assertTrue(thrown); + dataContext.query().from("nonExistingTable").select("user").and("message").execute(); } - @Test + @Test(expected = IllegalArgumentException.class) public void testQueryForAnExistingTableAndNonExistingField() throws Exception { - indexOneTweeterDocumentPerIndex(indexType1, 1); - boolean thrown = false; - try { - dataContext.query().from(indexType1).select("nonExistingField").execute(); - } catch (IllegalArgumentException IAex) { - thrown = true; - } finally { - // ds.close(); - } - assertTrue(thrown); + indexTweeterDocument(indexType1, 1); + dataContext.query().from(indexType1).select("nonExistingField").execute(); } @Test @@ -547,45 +533,41 @@ public class ElasticSearchDataContextTest { private static void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) { BulkRequestBuilder bulkRequest = client.prepareBulk(); - try { - for (int i = 0; i < numberOfDocuments; i++) { - bulkRequest.add(client.prepareIndex(indexName, indexType, new Integer(i).toString()).setSource( - buildTweeterJson(i))); - } - bulkRequest.execute().actionGet(); - } catch (Exception ex) { - System.out.println("Exception indexing documents!!!!!"); + for (int i = 0; i < numberOfDocuments; i++) { + bulkRequest.add(client.prepareIndex(indexName, indexType, Integer.toString(i)).setSource( + buildTweeterJson(i))); } + bulkRequest.execute().actionGet(); + } + private static void indexTweeterDocument(String indexType, int id, Date date) { + client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id, date)) + .setId("tweet_" + indexType + "_" + id).execute().actionGet(); } - private static void indexOneTweeterDocumentPerIndex(String indexType, int id) { - try { - client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id)) - .setId("tweet_" + indexType + "_" + id).execute().actionGet(); - } catch (Exception ex) { - System.out.println("Exception indexing documents!!!!!"); - } + private static void indexTweeterDocument(String indexType, int id) { + client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id)) + .setId("tweet_" + indexType + "_" + id).execute().actionGet(); } - private static void indexOnePeopleDocument(String gender, int age, int id) { - try { - client.prepareIndex(indexName, peopleIndexType).setSource(buildPeopleJson(gender, age, id)).execute() - .actionGet(); - } catch (Exception ex) { - System.out.println("Exception indexing documents!!!!!"); - } + private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException { + client.prepareIndex(indexName, peopleIndexType).setSource(buildPeopleJson(gender, age, id)).execute() + .actionGet(); + } + + private static Map<String, Object> buildTweeterJson(int elementId) { + return buildTweeterJson(elementId, new Date()); } - private static Map<String, Object> buildTweeterJson(int elementId) throws Exception { - Map<String, Object> map = new LinkedHashMap<String, Object>(); + private static Map<String, Object> buildTweeterJson(int elementId, Date date) { + Map<String, Object> map = new LinkedHashMap<>(); map.put("user", "user" + elementId); - map.put("postDate", new Date()); + map.put("postDate", date); map.put("message", elementId); return map; } - private static XContentBuilder buildPeopleJson(String gender, int age, int elementId) throws Exception { + private static XContentBuilder buildPeopleJson(String gender, int age, int elementId) throws IOException { return jsonBuilder().startObject().field("gender", gender).field("age", age).field("id", elementId).endObject(); }