This is an automated email from the ASF dual-hosted git repository.

prasanthj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a8edc0  HIVE-22290: ObjectStore.cleanWriteNotificationEvents and 
ObjectStore.cleanupEvents OutOfMemory on large number of pending events (#1484)
3a8edc0 is described below

commit 3a8edc02f542d1dc7c6f715e4b7f11e30bf65c83
Author: Naresh P R <prnaresh.nar...@gmail.com>
AuthorDate: Tue Sep 15 20:05:37 2020 -0700

    HIVE-22290: ObjectStore.cleanWriteNotificationEvents and 
ObjectStore.cleanupEvents OutOfMemory on large number of pending events (#1484)
---
 .../apache/hadoop/hive/metastore/ObjectStore.java  | 81 ++++++++++++++++------
 1 file changed, 60 insertions(+), 21 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index f866b94..f12ce84 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -10426,9 +10426,41 @@ public class ObjectStore implements RawStore, 
Configurable {
       int tooOld = (tmp > Integer.MAX_VALUE) ? 0 : (int) tmp;
       query = pm.newQuery(MTxnWriteNotificationLog.class, "eventTime < 
tooOld");
       query.declareParameters("java.lang.Integer tooOld");
-      Collection<MTxnWriteNotificationLog> toBeRemoved = (Collection) 
query.execute(tooOld);
-      if (CollectionUtils.isNotEmpty(toBeRemoved)) {
+
+      int max_events = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.EVENT_CLEAN_MAX_EVENTS);
+      max_events = max_events > 0 ? max_events : Integer.MAX_VALUE;
+      query.setRange(0, max_events);
+      query.setOrdering("txnId ascending");
+
+      List<MTxnWriteNotificationLog> toBeRemoved = (List) 
query.execute(tooOld);
+      int iteration = 0;
+      int eventCount = 0;
+      long minTxnId = 0;
+      long minEventTime = 0;
+      long maxTxnId = 0;
+      long maxEventTime = 0;
+      while (CollectionUtils.isNotEmpty(toBeRemoved)) {
+        int listSize = toBeRemoved.size();
+        if (iteration == 0) {
+          MTxnWriteNotificationLog firstNotification = toBeRemoved.get(0);
+          minTxnId = firstNotification.getTxnId();
+          minEventTime = firstNotification.getEventTime();
+        }
+        MTxnWriteNotificationLog lastNotification = toBeRemoved.get(listSize - 
1);
+        maxTxnId = lastNotification.getTxnId();
+        maxEventTime = lastNotification.getEventTime();
+
         pm.deletePersistentAll(toBeRemoved);
+        eventCount += listSize;
+        iteration++;
+        toBeRemoved = (List) query.execute(tooOld);
+      }
+      if (iteration == 0) {
+        LOG.info("No WriteNotification events found to be cleaned with 
eventTime < {}.", tooOld);
+      } else {
+        LOG.info("WriteNotification Cleaned {} events with eventTime < {} in 
{} iteration, " +
+            "minimum txnId {} (with eventTime {}) and maximum txnId {} (with 
eventTime {})",
+            eventCount, tooOld, iteration, minTxnId, minEventTime, maxTxnId, 
maxEventTime);
       }
       commited = commitTransaction();
     } finally {
@@ -10617,26 +10649,33 @@ public class ObjectStore implements RawStore, 
Configurable {
       query.setOrdering("eventId ascending");
 
       List<MNotificationLog> toBeRemoved = (List) query.execute(tooOld);
-      if (toBeRemoved == null || toBeRemoved.size() == 0) {
-        LOG.info("No events found to be cleaned with eventTime < {}.", tooOld);
-      } else {
-        NotificationEvent firstEvent = translateDbToThrift(toBeRemoved.get(0));
-        long minEventId = firstEvent.getEventId();
-        long minEventTime = firstEvent.getEventTime();
-        long maxEventId = minEventId;
-        long maxEventTime = minEventTime;
-        if (toBeRemoved.size() > 1) {
-          NotificationEvent lastEvent =
-                  translateDbToThrift(toBeRemoved.get(toBeRemoved.size() - 1));
-          maxEventId = lastEvent.getEventId();
-          maxEventTime = lastEvent.getEventTime();
-        }
-        LOG.info("Cleaned {} events with eventTime < {}, minimum eventId {} 
(with eventTime {}) " +
-                        "and maximum eventId {} (with eventTime {})",
-                toBeRemoved.size(), tooOld, minEventId, minEventTime, 
maxEventId, maxEventTime);
-      }
-      if (CollectionUtils.isNotEmpty(toBeRemoved)) {
+      int iteration = 0;
+      int eventCount = 0;
+      long minEventId = 0;
+      long minEventTime = 0;
+      long maxEventId = 0;
+      long maxEventTime = 0;
+      while (CollectionUtils.isNotEmpty(toBeRemoved)) {
+        int listSize = toBeRemoved.size();
+        if (iteration == 0) {
+          MNotificationLog firstNotification = toBeRemoved.get(0);
+          minEventId = firstNotification.getEventId();
+          minEventTime = firstNotification.getEventTime();
+        }
+        MNotificationLog lastNotification = toBeRemoved.get(listSize - 1);
+        maxEventId = lastNotification.getEventId();
+        maxEventTime = lastNotification.getEventTime();
         pm.deletePersistentAll(toBeRemoved);
+        eventCount += listSize;
+        iteration++;
+        toBeRemoved = (List) query.execute(tooOld);
+      }
+      if (iteration == 0) {
+        LOG.info("No Notification events found to be cleaned with eventTime < 
{}.", tooOld);
+      } else {
+        LOG.info("Notification Cleaned {} events with eventTime < {} in {} 
iteration, " +
+            "minimum eventId {} (with eventTime {}) and maximum eventId {} 
(with eventTime {})",
+            eventCount, tooOld, iteration, minEventId, minEventTime, 
maxEventId, maxEventTime);
       }
       commited = commitTransaction();
     } finally {

Reply via email to