This is an automated email from the ASF dual-hosted git repository. prasanthj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 3a8edc0 HIVE-22290: ObjectStore.cleanWriteNotificationEvents and ObjectStore.cleanupEvents OutOfMemory on large number of pending events (#1484) 3a8edc0 is described below commit 3a8edc02f542d1dc7c6f715e4b7f11e30bf65c83 Author: Naresh P R <prnaresh.nar...@gmail.com> AuthorDate: Tue Sep 15 20:05:37 2020 -0700 HIVE-22290: ObjectStore.cleanWriteNotificationEvents and ObjectStore.cleanupEvents OutOfMemory on large number of pending events (#1484) --- .../apache/hadoop/hive/metastore/ObjectStore.java | 81 ++++++++++++++++------ 1 file changed, 60 insertions(+), 21 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index f866b94..f12ce84 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -10426,9 +10426,41 @@ public class ObjectStore implements RawStore, Configurable { int tooOld = (tmp > Integer.MAX_VALUE) ? 0 : (int) tmp; query = pm.newQuery(MTxnWriteNotificationLog.class, "eventTime < tooOld"); query.declareParameters("java.lang.Integer tooOld"); - Collection<MTxnWriteNotificationLog> toBeRemoved = (Collection) query.execute(tooOld); - if (CollectionUtils.isNotEmpty(toBeRemoved)) { + + int max_events = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.EVENT_CLEAN_MAX_EVENTS); + max_events = max_events > 0 ? max_events : Integer.MAX_VALUE; + query.setRange(0, max_events); + query.setOrdering("txnId ascending"); + + List<MTxnWriteNotificationLog> toBeRemoved = (List) query.execute(tooOld); + int iteration = 0; + int eventCount = 0; + long minTxnId = 0; + long minEventTime = 0; + long maxTxnId = 0; + long maxEventTime = 0; + while (CollectionUtils.isNotEmpty(toBeRemoved)) { + int listSize = toBeRemoved.size(); + if (iteration == 0) { + MTxnWriteNotificationLog firstNotification = toBeRemoved.get(0); + minTxnId = firstNotification.getTxnId(); + minEventTime = firstNotification.getEventTime(); + } + MTxnWriteNotificationLog lastNotification = toBeRemoved.get(listSize - 1); + maxTxnId = lastNotification.getTxnId(); + maxEventTime = lastNotification.getEventTime(); + pm.deletePersistentAll(toBeRemoved); + eventCount += listSize; + iteration++; + toBeRemoved = (List) query.execute(tooOld); + } + if (iteration == 0) { + LOG.info("No WriteNotification events found to be cleaned with eventTime < {}.", tooOld); + } else { + LOG.info("WriteNotification Cleaned {} events with eventTime < {} in {} iteration, " + + "minimum txnId {} (with eventTime {}) and maximum txnId {} (with eventTime {})", + eventCount, tooOld, iteration, minTxnId, minEventTime, maxTxnId, maxEventTime); } commited = commitTransaction(); } finally { @@ -10617,26 +10649,33 @@ public class ObjectStore implements RawStore, Configurable { query.setOrdering("eventId ascending"); List<MNotificationLog> toBeRemoved = (List) query.execute(tooOld); - if (toBeRemoved == null || toBeRemoved.size() == 0) { - LOG.info("No events found to be cleaned with eventTime < {}.", tooOld); - } else { - NotificationEvent firstEvent = translateDbToThrift(toBeRemoved.get(0)); - long minEventId = firstEvent.getEventId(); - long minEventTime = firstEvent.getEventTime(); - long maxEventId = minEventId; - long maxEventTime = minEventTime; - if (toBeRemoved.size() > 1) { - NotificationEvent lastEvent = - translateDbToThrift(toBeRemoved.get(toBeRemoved.size() - 1)); - maxEventId = lastEvent.getEventId(); - maxEventTime = lastEvent.getEventTime(); - } - LOG.info("Cleaned {} events with eventTime < {}, minimum eventId {} (with eventTime {}) " + - "and maximum eventId {} (with eventTime {})", - toBeRemoved.size(), tooOld, minEventId, minEventTime, maxEventId, maxEventTime); - } - if (CollectionUtils.isNotEmpty(toBeRemoved)) { + int iteration = 0; + int eventCount = 0; + long minEventId = 0; + long minEventTime = 0; + long maxEventId = 0; + long maxEventTime = 0; + while (CollectionUtils.isNotEmpty(toBeRemoved)) { + int listSize = toBeRemoved.size(); + if (iteration == 0) { + MNotificationLog firstNotification = toBeRemoved.get(0); + minEventId = firstNotification.getEventId(); + minEventTime = firstNotification.getEventTime(); + } + MNotificationLog lastNotification = toBeRemoved.get(listSize - 1); + maxEventId = lastNotification.getEventId(); + maxEventTime = lastNotification.getEventTime(); pm.deletePersistentAll(toBeRemoved); + eventCount += listSize; + iteration++; + toBeRemoved = (List) query.execute(tooOld); + } + if (iteration == 0) { + LOG.info("No Notification events found to be cleaned with eventTime < {}.", tooOld); + } else { + LOG.info("Notification Cleaned {} events with eventTime < {} in {} iteration, " + + "minimum eventId {} (with eventTime {}) and maximum eventId {} (with eventTime {})", + eventCount, tooOld, iteration, minEventId, minEventTime, maxEventId, maxEventTime); } commited = commitTransaction(); } finally {