Author: olga
Date: Thu Jun 26 12:03:13 2008
New Revision: 671987

URL: http://svn.apache.org/viewvc?rev=671987&view=rev
Log:
PIG-235: memory management improvement

Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=671987&r1=671986&r2=671987&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Jun 26 12:03:13 2008
@@ -331,3 +331,5 @@
     PIG-243: make unit tests work on windows (daijy via olgan)
 
     PIG-271: added tutorial to SVN (olgan)
+
+    PIG-235: memory management improvements (pkamath via olgan)

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=671987&r1=671986&r2=671987&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java 
(original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java 
Thu Jun 26 12:03:13 2008
@@ -35,9 +35,26 @@
     
     LinkedList<WeakReference<Spillable>> spillables = new 
LinkedList<WeakReference<Spillable>>();
     
-    private static long gcActivationSize = Long.MAX_VALUE ;
-    private static long spillFileSizeThreshold = 0L ;
-    
+    // if we freed at least this much, invoke GC 
+    // (default 40 MB - this can be overridden by user supplied property)
+    private static long gcActivationSize = 40000000L ;
+    
+    // spill file size should be at least this much
+    // (default 5MB - this can be overridden by user supplied property)
+    private static long spillFileSizeThreshold = 5000000L ;
+    
+    // this will keep track of memory freed across spills
+    // and between GC invocations
+    private static long accumulatedFreeSize = 0L;
+    
+    // fraction of biggest heap for which we want to get
+    // "memory usage threshold exceeded" notifications
+    private static double memoryThresholdFraction = 0.7;
+    
+    // fraction of biggest heap for which we want to get
+    // "collection threshold exceeded" notifications
+    private static double collectionMemoryThresholdFraction = 0.5;
+        
     public SpillableMemoryManager() {
         
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this,
 null, null);
         List<MemoryPoolMXBean> mpbeans = 
ManagementFactory.getMemoryPoolMXBeans();
@@ -62,9 +79,22 @@
         }
         log.debug("Selected heap to monitor (" +
             biggestHeap.getName() + ")");
+        
+        // we want to set both collection and usage threshold alerts to be 
+        // safe. In some local tests after a point only collection threshold
+        // notifications were being sent though usage threshold notifications
+        // were sent early on. So using both would ensure that
+        // 1) we get notified early (though usage threshold exceeded 
notifications)
+        // 2) we get notified always when threshold is exceeded (either usage 
or
+        //    collection)
+        
         /* We set the threshold to be 50% of tenured since that is where
          * the GC starts to dominate CPU time according to Sun doc */
-        biggestHeap.setCollectionUsageThreshold((long)(biggestSize*.5));    
+        biggestHeap.setCollectionUsageThreshold((long)(biggestSize * 
collectionMemoryThresholdFraction));
+        // we set a higher threshold for usage threshold exceeded notification
+        // since this is more likely to be effective sooner and we do not
+        // want to be spilling too soon
+        biggestHeap.setUsageThreshold((long)(biggestSize * 
memoryThresholdFraction));
     }
     
     public static void configure(Properties properties) {
@@ -72,6 +102,7 @@
         try {
             spillFileSizeThreshold = Long.parseLong(
                         (String) 
properties.getProperty("pig.spill.size.threshold") ) ;
+            
             gcActivationSize = Long.parseLong(
                     (String) 
properties.getProperty("pig.spill.gc.activation.size") ) ;
         } 
@@ -79,14 +110,26 @@
             throw new RuntimeException("Error while converting system 
configurations" +
                        "spill.size.threshold, spill.gc.activation.size", nfe) ;
         }
-          
     }
     
     public void handleNotification(Notification n, Object o) {
         CompositeData cd = (CompositeData) n.getUserData();
         MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
-        log.info("low memory handler called " + info.getUsage());
-        long toFree = info.getUsage().getUsed() - 
(long)(info.getUsage().getMax()*.5);
+        
+        // free the amount exceeded over the threshold and then a further half
+        // so if threshold = heapmax/2, we will be trying to free
+        // used - heapmax/2 + heapmax/4
+        long toFree = 0L;
+        
if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
+            long threshold = (long)(info.getUsage().getMax() * 
memoryThresholdFraction);
+            toFree = info.getUsage().getUsed() - threshold + (long)(threshold 
* 0.5);
+            log.info("low memory handler called (Usage threshold exceeded) " + 
info.getUsage());
+        } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
+            long threshold = (long)(info.getUsage().getMax() * 
collectionMemoryThresholdFraction);
+            toFree = info.getUsage().getUsed() - threshold + (long)(threshold 
* 0.5);
+            log.info("low memory handler called (Collection threshold 
exceeded) " + info.getUsage());
+        }
+         
         if (toFree < 0) {
             log.debug("low memory handler returning " + 
                 "because there is nothing to free");
@@ -100,13 +143,15 @@
                 Spillable s = i.next().get();
                 if (s == null) {
                     i.remove();
-                }
+                }   
             }
             Collections.sort(spillables, new 
Comparator<WeakReference<Spillable>>() {
 
                 /**
                  * We don't lock anything, so this sort may not be stable if a 
WeakReference suddenly
                  * becomes null, but it will be close enough.
+                 * Also between the time we sort and we use these spillables, 
they
+                 * may actually change in size - so this is just best effort
                  */    
                 public int compare(WeakReference<Spillable> o1Ref, 
WeakReference<Spillable> o2Ref) {
                     Spillable o1 = o1Ref.get();
@@ -133,6 +178,7 @@
                 }
             });
             long estimatedFreed = 0;
+            boolean invokeGC = false;
             for (i = spillables.iterator(); i.hasNext();) {
                 Spillable s = i.next().get();
                 // Still need to check for null here, even after we removed
@@ -143,27 +189,34 @@
                     continue;
                 }
                 long toBeFreed = s.getMemorySize();
+                log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold 
= "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
                 // Don't keep trying if the rest of files are too small
                 if (toBeFreed < spillFileSizeThreshold) {
+                    log.debug("spilling small files - getting out of memory 
handler");
                     break ;
                 }
-                
                 s.spill();               
-                
+                estimatedFreed += toBeFreed;
+                accumulatedFreeSize += toBeFreed;
                 // This should significantly reduce the number of small files
                 // in case that we have a lot of nested bags
-                if (toBeFreed > gcActivationSize) {
-                    System.gc();
+                if (accumulatedFreeSize > gcActivationSize) {
+                    invokeGC = true;
                 }
                 
-                estimatedFreed += toBeFreed;
-                
                 if (estimatedFreed > toFree) {
+                    log.debug("Freed enough space - getting out of memory 
handler");
+                    invokeGC = true;
                     break;
                 }
             }
+            
             /* Poke the GC again to see if we successfully freed enough memory 
*/
-            System.gc();
+            if(invokeGC) {
+                System.gc();
+                // now that we have invoked the GC, reset accumulatedFreeSize
+                accumulatedFreeSize = 0;
+            }
         }
     }
     


Reply via email to