This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 60ca5f22eb0f880a1ade3a3888231ce9b2c04b1c Author: Vihang Karajgaonkar <vih...@cloudera.com> AuthorDate: Thu Feb 28 17:48:25 2019 -0800 IMPALA-8266 : Event filtering logic may not filter all the events This patch fixes a bug in event filtering logic. The bug shows up when atleast one event is filtered out and then a inverse event immediately shows up after a create_table or create_database event. For example, consider a event stream has following sequence create_database, create_table, drop_table, drop_database. In such a case only the first create_database gets filtered out instead of both the create_database and create_table event. This leads to a exception while processing create_table since the database creation is skipped. Testing done: 1. Adds additional cases in the existing test which generates such sequence of events. Change-Id: Iaeaa26017ee223cca18344e5e1d6ace87200fd9c Reviewed-on: http://gerrit.cloudera.org:8080/12641 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- .../impala/catalog/events/MetastoreEvents.java | 21 +++--- .../events/MetastoreEventsProcessorTest.java | 87 +++++++++++++++------- 2 files changed, 73 insertions(+), 35 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index f94ddc3..967d5e1 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -173,22 +173,23 @@ public class MetastoreEvents { } Iterator<MetastoreEvent> it = metastoreEvents.iterator(); // filter out the create events which has a corresponding drop event later - int fromIndex = 0; + int sizeBefore = metastoreEvents.size(); int numFilteredEvents = 0; - int inputSize = metastoreEvents.size(); - while (it.hasNext()) { - MetastoreEvent current = it.next(); - if (fromIndex < metastoreEvents.size() && current.isRemovedAfter( - metastoreEvents.subList(fromIndex + 1, metastoreEvents.size()))) { - LOG.info(current.debugString("Filtering out this event since the object is " + int i = 0; + while (i < metastoreEvents.size()) { + MetastoreEvent currentEvent = metastoreEvents.get(i); + if (currentEvent.isRemovedAfter(metastoreEvents.subList(i + 1, + metastoreEvents.size()))) { + LOG.info(currentEvent.debugString("Filtering out this event since the object is " + "either removed or renamed later in the event stream")); - it.remove(); + metastoreEvents.remove(i); numFilteredEvents++; + } else { + i++; } - fromIndex++; } LOG.info(String.format("Total number of events received: %d Total number of events " - + "filtered out: %d", inputSize, numFilteredEvents)); + + "filtered out: %d", sizeBefore, numFilteredEvents)); return metastoreEvents; } } diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index 3a5e56a..f8ff8db 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -17,6 +17,11 @@ package org.apache.impala.catalog.events; +import static org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.ALTER_TABLE; +import static org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.CREATE_DATABASE; +import static org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.CREATE_TABLE; +import static org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.DROP_DATABASE; +import static org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.DROP_TABLE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -595,13 +600,13 @@ public class MetastoreEventsProcessorTest { assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus()); createDatabase(); eventsProcessor_.processEvents(); - createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false); + createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED, false); assertNotNull("Table should have been found after create table statement", catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)); loadTable(TEST_TABLE_NAME_NONPARTITIONED); dropTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED); // now catalogD does not have the table entry, create the table again - createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false); + createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED, false); assertNotNull("Table should have been found after create table statement", catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)); loadTable(TEST_TABLE_NAME_NONPARTITIONED); @@ -644,7 +649,7 @@ public class MetastoreEventsProcessorTest { @Test public void testTableEventsFromImpala() throws ImpalaException { createDatabaseFromImpala(TEST_DB_NAME, "created from Impala"); - createTableFromImpala(TEST_TABLE_NAME_PARTITIONED, true); + createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED, true); loadTable(TEST_TABLE_NAME_PARTITIONED); List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents(); assertEquals(2, events.size()); @@ -671,32 +676,63 @@ public class MetastoreEventsProcessorTest { * table should not create a in */ @Test - public void testEventFiltering() throws ImpalaException { + public void testEventFiltering() throws Exception { createDatabaseFromImpala(TEST_DB_NAME, ""); - createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false); + createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED, false); loadTable(TEST_TABLE_NAME_NONPARTITIONED); assertNotNull(catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)); dropTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED); // the create table event should be filtered out - List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents(); - assertEquals(3, events.size()); - List<MetastoreEvent> filteredEvents = - eventsProcessor_.getMetastoreEventFactory().getFilteredEvents(events); - assertEquals(2, filteredEvents.size()); - assertEquals(MetastoreEventType.CREATE_DATABASE, filteredEvents.get(0).eventType_); - assertEquals(MetastoreEventType.DROP_TABLE, filteredEvents.get(1).eventType_); - eventsProcessor_.processEvents(); - assertNull(catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)); + verifyFilterEvents(3, 2, Arrays.asList(CREATE_DATABASE, DROP_TABLE)); // test the table rename case - createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false); + createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED, false); renameTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, "new_name"); - events = eventsProcessor_.getNextMetastoreEvents(); - assertEquals(2, events.size()); - filteredEvents = + // create table gets filtered out since it was renamed immediated after + verifyFilterEvents(2, 1, Arrays.asList(ALTER_TABLE)); + + //cleanup + dropDatabaseCascadeFromImpala(TEST_DB_NAME); + eventsProcessor_.processEvents(); + + // test when multiple events can be filtered out + // create_db, create_tbl, drop_tbl, drop_db + createDatabaseFromImpala(TEST_DB_NAME, "desc"); + createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED, false); + loadTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED); + assertNotNull(catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED)); + dropTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED); + dropDatabaseCascadeFromImpala(TEST_DB_NAME); + verifyFilterEvents(4, 2, Arrays.asList(DROP_TABLE, DROP_DATABASE)); + + // create event stream s.t inverse events have gaps from their counterparts + createDatabase(); + // unrelated event + createTable("dummy", false); + createTable(TEST_TABLE_NAME_NONPARTITIONED, false); + // dummy events + alterTableAddParameter(TEST_TABLE_NAME_NONPARTITIONED, "paramkey", "paramVal"); + alterTableAddParameter(TEST_TABLE_NAME_NONPARTITIONED, "paramkey1", "paramVal2"); + dropTable(TEST_TABLE_NAME_NONPARTITIONED); + // this would generate drop_table for dummy table as well + dropDatabaseCascade(TEST_DB_NAME); + verifyFilterEvents(8, 5, Arrays.asList(ALTER_TABLE, ALTER_TABLE, DROP_TABLE, + DROP_TABLE, DROP_DATABASE)); + } + + private void verifyFilterEvents(int total, int numFiltered, + List<MetastoreEventType> expectedFilteredEventTypes) throws ImpalaException { + List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents(); + assertEquals(total, events.size()); + List<MetastoreEvent> filteredEvents = eventsProcessor_.getMetastoreEventFactory().getFilteredEvents(events); - assertEquals(1, filteredEvents.size()); - assertEquals(MetastoreEventType.ALTER_TABLE, filteredEvents.get(0).eventType_); + assertEquals(numFiltered, filteredEvents.size()); + int i = 0; + for (MetastoreEvent e : filteredEvents) { + assertEquals(expectedFilteredEventTypes.get(i++), e.eventType_); + } + eventsProcessor_.processEvents(); + assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus()); } /** @@ -712,7 +748,7 @@ public class MetastoreEventsProcessorTest { createDatabaseFromImpala(TEST_DB_NAME, "first"); assertNotNull("Db should have been found after create database statement", catalog_.getDb(TEST_DB_NAME)); - dropDatabaseFromImpala(TEST_DB_NAME); + dropDatabaseCascadeFromImpala(TEST_DB_NAME); assertNull(catalog_.getDb(TEST_DB_NAME)); createDatabaseFromImpala(TEST_DB_NAME, "second"); assertNotNull(catalog_.getDb(TEST_DB_NAME)); @@ -1112,7 +1148,7 @@ public class MetastoreEventsProcessorTest { } testDDLOpUsingEvent(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED, dbParams, - tblParams, MetastoreEventType.CREATE_DATABASE, shouldEventGetProcessed); + tblParams, CREATE_DATABASE, shouldEventGetProcessed); testDDLOpUsingEvent(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED, dbParams, tblParams, MetastoreEventType.ALTER_DATABASE, shouldEventGetProcessed); @@ -1427,18 +1463,19 @@ public class MetastoreEventsProcessorTest { /** * Drops db from Impala */ - private void dropDatabaseFromImpala(String dbName) throws ImpalaException { + private void dropDatabaseCascadeFromImpala(String dbName) throws ImpalaException { TDdlExecRequest req = new TDdlExecRequest(); req.setDdl_type(TDdlType.DROP_DATABASE); TDropDbParams dropDbParams = new TDropDbParams(); dropDbParams.setDb(dbName); + dropDbParams.setCascade(true); req.setDrop_db_params(dropDbParams); catalogOpExecutor_.execDdlRequest(req); } - private void createTableFromImpala(String tblName, boolean isPartitioned) + private void createTableFromImpala(String dbName, String tblName, boolean isPartitioned) throws ImpalaException { - createTableFromImpala(TEST_DB_NAME, tblName, null, isPartitioned); + createTableFromImpala(dbName, tblName, null, isPartitioned); } /** * Creates a table using CatalogOpExecutor to simulate a DDL operation from Impala