YARN-9040.  Fixed memory leak in LevelDBCacheTimelineStore and DBIterator.
            Contributed by Tarun Parimi


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/71e0b0d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/71e0b0d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/71e0b0d8

Branch: refs/heads/HDFS-12943
Commit: 71e0b0d8005ea1952dc7e582db15c2ac09df7c91
Parents: 346c0c8
Author: Eric Yang <ey...@apache.org>
Authored: Mon Dec 17 12:04:25 2018 -0500
Committer: Eric Yang <ey...@apache.org>
Committed: Mon Dec 17 12:04:25 2018 -0500

----------------------------------------------------------------------
 .../timeline/KeyValueBasedTimelineStore.java    | 98 ++++++++++----------
 .../server/timeline/MemoryTimelineStore.java    | 37 +++++++-
 .../timeline/TimelineStoreMapAdapter.java       |  9 +-
 .../timeline/LevelDBCacheTimelineStore.java     | 14 ++-
 4 files changed, 99 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/71e0b0d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java
index 82db770..c24d904 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java
@@ -42,7 +42,6 @@ import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -50,6 +49,7 @@ import java.util.Set;
 import java.util.SortedSet;
 
 import static 
org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
+import static 
org.apache.hadoop.yarn.server.timeline.TimelineStoreMapAdapter.CloseableIterator;
 
 /**
  * Map based implementation of {@link TimelineStore}. A hash map
@@ -114,66 +114,67 @@ abstract class KeyValueBasedTimelineStore
       fields = EnumSet.allOf(Field.class);
     }
 
-    Iterator<TimelineEntity> entityIterator = null;
+    TimelineEntity firstEntity = null;
     if (fromId != null) {
-      TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
+      firstEntity = entities.get(new EntityIdentifier(fromId,
           entityType));
       if (firstEntity == null) {
         return new TimelineEntities();
-      } else {
-        entityIterator = entities.valueSetIterator(firstEntity);
       }
     }
-    if (entityIterator == null) {
-      entityIterator = entities.valueSetIterator();
-    }
 
     List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
-    while (entityIterator.hasNext()) {
-      TimelineEntity entity = entityIterator.next();
-      if (entitiesSelected.size() >= limit) {
-        break;
-      }
-      if (!entity.getEntityType().equals(entityType)) {
-        continue;
-      }
-      if (entity.getStartTime() <= windowStart) {
-        continue;
-      }
-      if (entity.getStartTime() > windowEnd) {
-        continue;
-      }
-      if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
-          entity.getEntityId(), entity.getEntityType())) > fromTs) {
-        continue;
-      }
-      if (primaryFilter != null &&
-          !KeyValueBasedTimelineStoreUtils.matchPrimaryFilter(
-              entity.getPrimaryFilters(), primaryFilter)) {
-        continue;
-      }
-      if (secondaryFilters != null) { // AND logic
-        boolean flag = true;
-        for (NameValuePair secondaryFilter : secondaryFilters) {
-          if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
-              .matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter)
-              && !KeyValueBasedTimelineStoreUtils.matchFilter(
-              entity.getOtherInfo(), secondaryFilter)) {
-            flag = false;
-            break;
-          }
+
+    try(CloseableIterator<TimelineEntity> entityIterator =
+        firstEntity == null ? entities.valueSetIterator() :
+            entities.valueSetIterator(firstEntity)) {
+      while (entityIterator.hasNext()) {
+        TimelineEntity entity = entityIterator.next();
+        if (entitiesSelected.size() >= limit) {
+          break;
         }
-        if (!flag) {
+        if (!entity.getEntityType().equals(entityType)) {
           continue;
         }
-      }
-      if (entity.getDomainId() == null) {
-        entity.setDomainId(DEFAULT_DOMAIN_ID);
-      }
-      if (checkAcl == null || checkAcl.check(entity)) {
-        entitiesSelected.add(entity);
+        if (entity.getStartTime() <= windowStart) {
+          continue;
+        }
+        if (entity.getStartTime() > windowEnd) {
+          continue;
+        }
+        if (fromTs != null && entityInsertTimes.get(
+            new EntityIdentifier(entity.getEntityId(), entity.getEntityType()))
+            > fromTs) {
+          continue;
+        }
+        if (primaryFilter != null && !KeyValueBasedTimelineStoreUtils
+            .matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
+          continue;
+        }
+        if (secondaryFilters != null) { // AND logic
+          boolean flag = true;
+          for (NameValuePair secondaryFilter : secondaryFilters) {
+            if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
+                .matchPrimaryFilter(entity.getPrimaryFilters(), 
secondaryFilter)
+                && !KeyValueBasedTimelineStoreUtils
+                .matchFilter(entity.getOtherInfo(), secondaryFilter)) {
+              flag = false;
+              break;
+            }
+          }
+          if (!flag) {
+            continue;
+          }
+        }
+        if (entity.getDomainId() == null) {
+          entity.setDomainId(DEFAULT_DOMAIN_ID);
+        }
+        if (checkAcl == null || checkAcl.check(entity)) {
+          entitiesSelected.add(entity);
+        }
       }
     }
+
     List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
     for (TimelineEntity entitySelected : entitiesSelected) {
       entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields(
@@ -569,6 +570,7 @@ abstract class KeyValueBasedTimelineStore
       }
       return o;
     }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71e0b0d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
index 5c2db00..f4aea45 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.timeline;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -59,14 +60,15 @@ public class MemoryTimelineStore extends 
KeyValueBasedTimelineStore {
     }
 
     @Override
-    public Iterator<V>
+    public CloseableIterator<V>
     valueSetIterator() {
-      return new TreeSet<>(internalMap.values()).iterator();
+      return wrapClosableIterator(new TreeSet<>(internalMap.values())
+          .iterator());
     }
 
     @Override
     @SuppressWarnings("unchecked")
-    public Iterator<V> valueSetIterator(V minV) {
+    public CloseableIterator<V> valueSetIterator(V minV) {
       if (minV instanceof Comparable) {
         TreeSet<V> tempTreeSet = new TreeSet<>();
         for (V value : internalMap.values()) {
@@ -74,11 +76,38 @@ public class MemoryTimelineStore extends 
KeyValueBasedTimelineStore {
             tempTreeSet.add(value);
           }
         }
-        return tempTreeSet.iterator();
+        return wrapClosableIterator(tempTreeSet.iterator());
       } else {
         return valueSetIterator();
       }
     }
+
+    private CloseableIterator<V> wrapClosableIterator(
+        final Iterator<V> iterator) {
+      return new CloseableIterator<V>() {
+        private final Iterator<V> internalIterator = iterator;
+        @Override
+        public void close() throws IOException {
+          // Not implemented
+        }
+
+        @Override
+        public boolean hasNext() {
+          return internalIterator.hasNext();
+        }
+
+        @Override
+        public V next() {
+          return internalIterator.next();
+        }
+
+        @Override
+        public void remove() {
+          internalIterator.remove();
+        }
+      };
+
+    }
   }
 
   public MemoryTimelineStore() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71e0b0d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java
index 175ed0b..38fc28e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.timeline;
 
+import java.io.Closeable;
 import java.util.Iterator;
 
 /**
@@ -48,7 +49,7 @@ interface TimelineStoreMapAdapter<K, V> {
   /**
    * @return the iterator of the value set of the map
    */
-  Iterator<V> valueSetIterator();
+  CloseableIterator<V> valueSetIterator();
 
   /**
    * Return the iterator of the value set of the map, starting from minV if 
type
@@ -56,5 +57,9 @@ interface TimelineStoreMapAdapter<K, V> {
    * @param minV
    * @return
    */
-  Iterator<V> valueSetIterator(V minV);
+  CloseableIterator<V> valueSetIterator(V minV);
+
+  interface CloseableIterator<V> extends Iterator<V>, Closeable {}
 }
+
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71e0b0d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java
index f7a3d01..9b1ffdc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -211,18 +210,18 @@ public class LevelDBCacheTimelineStore extends 
KeyValueBasedTimelineStore {
     }
 
     @Override
-    public Iterator<V> valueSetIterator() {
+    public CloseableIterator<V> valueSetIterator() {
       return getIterator(null, Long.MAX_VALUE);
     }
 
     @Override
-    public Iterator<V> valueSetIterator(V minV) {
+    public CloseableIterator<V> valueSetIterator(V minV) {
       return getIterator(
           new EntityIdentifier(minV.getEntityId(), minV.getEntityType()),
           minV.getStartTime());
     }
 
-    private Iterator<V> getIterator(
+    private CloseableIterator<V> getIterator(
         EntityIdentifier startId, long startTimeMax) {
 
       final DBIterator internalDbIterator = entityDb.iterator();
@@ -247,7 +246,7 @@ public class LevelDBCacheTimelineStore extends 
KeyValueBasedTimelineStore {
           = entityPrefixKeyBuilder.getBytesForLookup();
       internalDbIterator.seek(startPrefixBytes);
 
-      return new Iterator<V>() {
+      return new CloseableIterator<V>() {
         @Override
         public boolean hasNext() {
           if (!internalDbIterator.hasNext()) {
@@ -284,6 +283,11 @@ public class LevelDBCacheTimelineStore extends 
KeyValueBasedTimelineStore {
           LOG.error("LevelDB map adapter does not support iterate-and-remove"
               + " use cases. ");
         }
+
+        @Override
+        public void close() throws IOException {
+          internalDbIterator.close();
+        }
       };
     }
     static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to