This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 60ca5f22eb0f880a1ade3a3888231ce9b2c04b1c
Author: Vihang Karajgaonkar <vih...@cloudera.com>
AuthorDate: Thu Feb 28 17:48:25 2019 -0800

    IMPALA-8266 : Event filtering logic may not filter all the events
    
    This patch fixes a bug in event filtering logic. The bug shows up when
    atleast one event is filtered out and then a inverse event immediately
    shows up after a create_table or create_database event. For example,
    consider a event stream has following sequence create_database, 
create_table,
    drop_table, drop_database. In such a case only the first create_database
    gets filtered out instead of both the create_database and create_table
    event. This leads to a exception while processing create_table since the
    database creation is skipped.
    
    Testing done:
    1. Adds additional cases in the existing test which generates such
    sequence of events.
    
    Change-Id: Iaeaa26017ee223cca18344e5e1d6ace87200fd9c
    Reviewed-on: http://gerrit.cloudera.org:8080/12641
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../impala/catalog/events/MetastoreEvents.java     | 21 +++---
 .../events/MetastoreEventsProcessorTest.java       | 87 +++++++++++++++-------
 2 files changed, 73 insertions(+), 35 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index f94ddc3..967d5e1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -173,22 +173,23 @@ public class MetastoreEvents {
       }
       Iterator<MetastoreEvent> it = metastoreEvents.iterator();
       // filter out the create events which has a corresponding drop event 
later
-      int fromIndex = 0;
+      int sizeBefore = metastoreEvents.size();
       int numFilteredEvents = 0;
-      int inputSize = metastoreEvents.size();
-      while (it.hasNext()) {
-        MetastoreEvent current = it.next();
-        if (fromIndex < metastoreEvents.size() && current.isRemovedAfter(
-            metastoreEvents.subList(fromIndex + 1, metastoreEvents.size()))) {
-          LOG.info(current.debugString("Filtering out this event since the 
object is "
+      int i = 0;
+      while (i < metastoreEvents.size()) {
+        MetastoreEvent currentEvent = metastoreEvents.get(i);
+        if (currentEvent.isRemovedAfter(metastoreEvents.subList(i + 1,
+            metastoreEvents.size()))) {
+          LOG.info(currentEvent.debugString("Filtering out this event since 
the object is "
               + "either removed or renamed later in the event stream"));
-          it.remove();
+          metastoreEvents.remove(i);
           numFilteredEvents++;
+        } else {
+          i++;
         }
-        fromIndex++;
       }
       LOG.info(String.format("Total number of events received: %d Total number 
of events "
-          + "filtered out: %d", inputSize, numFilteredEvents));
+          + "filtered out: %d", sizeBefore, numFilteredEvents));
       return metastoreEvents;
     }
   }
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 3a5e56a..f8ff8db 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.impala.catalog.events;
 
+import static 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.ALTER_TABLE;
+import static 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.CREATE_DATABASE;
+import static 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.CREATE_TABLE;
+import static 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.DROP_DATABASE;
+import static 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.DROP_TABLE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -595,13 +600,13 @@ public class MetastoreEventsProcessorTest {
     assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
     createDatabase();
     eventsProcessor_.processEvents();
-    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED, false);
     assertNotNull("Table should have been found after create table statement",
         catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
     loadTable(TEST_TABLE_NAME_NONPARTITIONED);
     dropTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED);
     // now catalogD does not have the table entry, create the table again
-    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED, false);
     assertNotNull("Table should have been found after create table statement",
         catalog_.getTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED));
     loadTable(TEST_TABLE_NAME_NONPARTITIONED);
@@ -644,7 +649,7 @@ public class MetastoreEventsProcessorTest {
   @Test
   public void testTableEventsFromImpala() throws ImpalaException {
     createDatabaseFromImpala(TEST_DB_NAME, "created from Impala");
-    createTableFromImpala(TEST_TABLE_NAME_PARTITIONED, true);
+    createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED, true);
     loadTable(TEST_TABLE_NAME_PARTITIONED);
     List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
     assertEquals(2, events.size());
@@ -671,32 +676,63 @@ public class MetastoreEventsProcessorTest {
    * table should not create a in
    */
   @Test
-  public void testEventFiltering() throws ImpalaException {
+  public void testEventFiltering() throws Exception {
     createDatabaseFromImpala(TEST_DB_NAME, "");
-    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED, false);
     loadTable(TEST_TABLE_NAME_NONPARTITIONED);
     assertNotNull(catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_NONPARTITIONED));
     dropTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED);
     // the create table event should be filtered out
-    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
-    assertEquals(3, events.size());
-    List<MetastoreEvent> filteredEvents =
-        eventsProcessor_.getMetastoreEventFactory().getFilteredEvents(events);
-    assertEquals(2, filteredEvents.size());
-    assertEquals(MetastoreEventType.CREATE_DATABASE, 
filteredEvents.get(0).eventType_);
-    assertEquals(MetastoreEventType.DROP_TABLE, 
filteredEvents.get(1).eventType_);
-    eventsProcessor_.processEvents();
-    assertNull(catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_NONPARTITIONED));
+    verifyFilterEvents(3, 2, Arrays.asList(CREATE_DATABASE, DROP_TABLE));
 
     // test the table rename case
-    createTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, false);
+    createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED, false);
     renameTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED, "new_name");
-    events = eventsProcessor_.getNextMetastoreEvents();
-    assertEquals(2, events.size());
-    filteredEvents =
+    // create table gets filtered out since it was renamed immediated after
+    verifyFilterEvents(2, 1, Arrays.asList(ALTER_TABLE));
+
+    //cleanup
+    dropDatabaseCascadeFromImpala(TEST_DB_NAME);
+    eventsProcessor_.processEvents();
+
+    // test when multiple events can be filtered out
+    // create_db, create_tbl, drop_tbl, drop_db
+    createDatabaseFromImpala(TEST_DB_NAME, "desc");
+    createTableFromImpala(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED, false);
+    loadTable(TEST_DB_NAME, TEST_TABLE_NAME_NONPARTITIONED);
+    assertNotNull(catalog_.getTable(TEST_DB_NAME, 
TEST_TABLE_NAME_NONPARTITIONED));
+    dropTableFromImpala(TEST_TABLE_NAME_NONPARTITIONED);
+    dropDatabaseCascadeFromImpala(TEST_DB_NAME);
+    verifyFilterEvents(4, 2, Arrays.asList(DROP_TABLE, DROP_DATABASE));
+
+    // create event stream s.t inverse events have gaps from their counterparts
+    createDatabase();
+    // unrelated event
+    createTable("dummy", false);
+    createTable(TEST_TABLE_NAME_NONPARTITIONED, false);
+    // dummy events
+    alterTableAddParameter(TEST_TABLE_NAME_NONPARTITIONED, "paramkey", 
"paramVal");
+    alterTableAddParameter(TEST_TABLE_NAME_NONPARTITIONED, "paramkey1", 
"paramVal2");
+    dropTable(TEST_TABLE_NAME_NONPARTITIONED);
+    // this would generate drop_table for dummy table as well
+    dropDatabaseCascade(TEST_DB_NAME);
+    verifyFilterEvents(8, 5, Arrays.asList(ALTER_TABLE, ALTER_TABLE, 
DROP_TABLE,
+        DROP_TABLE, DROP_DATABASE));
+  }
+
+  private void verifyFilterEvents(int total, int numFiltered,
+      List<MetastoreEventType> expectedFilteredEventTypes) throws 
ImpalaException {
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(total, events.size());
+    List<MetastoreEvent> filteredEvents =
         eventsProcessor_.getMetastoreEventFactory().getFilteredEvents(events);
-    assertEquals(1, filteredEvents.size());
-    assertEquals(MetastoreEventType.ALTER_TABLE, 
filteredEvents.get(0).eventType_);
+    assertEquals(numFiltered, filteredEvents.size());
+    int i = 0;
+    for (MetastoreEvent e : filteredEvents) {
+      assertEquals(expectedFilteredEventTypes.get(i++), e.eventType_);
+    }
+    eventsProcessor_.processEvents();
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
   }
 
   /**
@@ -712,7 +748,7 @@ public class MetastoreEventsProcessorTest {
     createDatabaseFromImpala(TEST_DB_NAME, "first");
     assertNotNull("Db should have been found after create database statement",
         catalog_.getDb(TEST_DB_NAME));
-    dropDatabaseFromImpala(TEST_DB_NAME);
+    dropDatabaseCascadeFromImpala(TEST_DB_NAME);
     assertNull(catalog_.getDb(TEST_DB_NAME));
     createDatabaseFromImpala(TEST_DB_NAME, "second");
     assertNotNull(catalog_.getDb(TEST_DB_NAME));
@@ -1112,7 +1148,7 @@ public class MetastoreEventsProcessorTest {
     }
 
     testDDLOpUsingEvent(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED, dbParams,
-        tblParams, MetastoreEventType.CREATE_DATABASE, 
shouldEventGetProcessed);
+        tblParams, CREATE_DATABASE, shouldEventGetProcessed);
     testDDLOpUsingEvent(TEST_DB_NAME, TEST_TABLE_NAME_PARTITIONED, dbParams,
         tblParams, MetastoreEventType.ALTER_DATABASE, shouldEventGetProcessed);
 
@@ -1427,18 +1463,19 @@ public class MetastoreEventsProcessorTest {
   /**
    * Drops db from Impala
    */
-  private void dropDatabaseFromImpala(String dbName) throws ImpalaException {
+  private void dropDatabaseCascadeFromImpala(String dbName) throws 
ImpalaException {
     TDdlExecRequest req = new TDdlExecRequest();
     req.setDdl_type(TDdlType.DROP_DATABASE);
     TDropDbParams dropDbParams = new TDropDbParams();
     dropDbParams.setDb(dbName);
+    dropDbParams.setCascade(true);
     req.setDrop_db_params(dropDbParams);
     catalogOpExecutor_.execDdlRequest(req);
   }
 
-  private void createTableFromImpala(String tblName, boolean isPartitioned)
+  private void createTableFromImpala(String dbName, String tblName, boolean 
isPartitioned)
       throws ImpalaException {
-    createTableFromImpala(TEST_DB_NAME, tblName, null, isPartitioned);
+    createTableFromImpala(dbName, tblName, null, isPartitioned);
   }
   /**
    * Creates a table using CatalogOpExecutor to simulate a DDL operation from 
Impala

Reply via email to