HIVE-12937 : DbNotificationListener unable to clean up old notification events (Sushanth Sowmyan, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1de97bc5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1de97bc5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1de97bc5 Branch: refs/heads/llap Commit: 1de97bc5fad323ae3bd48ebb39e6e68a3581e099 Parents: 8c8ff3f Author: Sushanth Sowmyan <khorg...@gmail.com> Authored: Tue Mar 29 11:21:23 2016 -0700 Committer: Sushanth Sowmyan <khorg...@gmail.com> Committed: Tue Mar 29 11:24:36 2016 -0700 ---------------------------------------------------------------------- .../listener/TestDbNotificationListener.java | 18 ++++++++++++++++++ .../apache/hadoop/hive/metastore/ObjectStore.java | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1de97bc5/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 6caf3fe..1360563 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -57,6 +57,7 @@ import java.util.Map; public class TestDbNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class.getName()); + private static final int EVENTS_TTL = 30; private static Map<String, String> emptyParameters = new HashMap<String, String>(); private static IMetaStoreClient msClient; private static Driver driver; @@ -68,6 +69,7 @@ public class TestDbNotificationListener { HiveConf conf = new HiveConf(); conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, DbNotificationListener.class.getName()); + conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL)+"s"); conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); @@ -565,4 +567,20 @@ public class TestDbNotificationListener { assertEquals(firstEventId + 19, event.getEventId()); assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType()); } + + @Test + public void cleanupNotifs() throws Exception { + Database db = new Database("cleanup1","no description","file:/tmp", emptyParameters); + msClient.createDatabase(db); + msClient.dropDatabase("cleanup1"); + + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(2, rsp.getEventsSize()); + + // sleep for expiry time, and then fetch again + Thread.sleep(EVENTS_TTL * 2 * 1000); // sleep twice the TTL interval - things should have been cleaned by then. + + NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(0, rsp2.getEventsSize()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1de97bc5/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 35adb39..ac293b9 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -7827,7 +7827,7 @@ public class ObjectStore implements RawStore, Configurable { query.declareParameters("java.lang.Integer tooOld"); Collection<MNotificationLog> toBeRemoved = (Collection) query.execute(tooOld); if (toBeRemoved != null && toBeRemoved.size() > 0) { - pm.deletePersistent(toBeRemoved); + pm.deletePersistentAll(toBeRemoved); } commited = commitTransaction(); } finally {