Author: olga
Date: Tue Jul 22 17:50:20 2008
New Revision: 678959

URL: http://svn.apache.org/viewvc?rev=678959&view=rev
Log:
merge of PIG-176-170-235-330

Modified:
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=678959&r1=678958&r2=678959&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Tue Jul 22 
17:50:20 2008
@@ -114,10 +114,6 @@
         this(execType, PropertiesUtil.loadPropertiesFromFile());
     }
 
-    public PigServer() throws ExecException {
-        this(ExecType.MAPREDUCE);
-    }
-    
     public PigServer(ExecType execType, Properties properties) throws 
ExecException {
         this.pigContext = new PigContext(execType, properties);
         if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) 
== null) {

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=678959&r1=678958&r2=678959&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
 Tue Jul 22 17:50:20 2008
@@ -3,6 +3,7 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -12,12 +13,15 @@
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ConfigurationValidator;
 
 
 public class LocalLauncher extends Launcher{
@@ -35,7 +39,9 @@
         MRCompiler comp = new MRCompiler(php, pc);
         comp.compile();
         
-        Configuration conf = new Configuration();
+        ExecutionEngine exe = pc.getExecutionEngine();
+        Properties validatedProperties = 
ConfigurationValidator.getValidatedProperties(exe.getConfiguration());
+        Configuration conf = 
ConfigurationUtil.toConfiguration(validatedProperties);
         conf.set("mapred.job.tracker", "local");
         JobClient jobClient = new JobClient(new JobConf(conf));
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=678959&r1=678958&r2=678959&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Tue Jul 22 17:50:20 2008
@@ -22,6 +22,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ConfigurationValidator;
 
 /**
  * Main class that launches pig for Map Reduce
@@ -42,6 +43,7 @@
         comp.compile();
         
         ExecutionEngine exe = pc.getExecutionEngine();
+        ConfigurationValidator.validatePigProperties(exe.getConfiguration());
         Configuration conf = 
ConfigurationUtil.toConfiguration(exe.getConfiguration());
         JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=678959&r1=678958&r2=678959&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 Tue Jul 22 17:50:20 2008
@@ -17,12 +17,14 @@
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
 
 public abstract class PigMapBase extends MapReduceBase{
     private final Log log = LogFactory.getLog(getClass());
@@ -52,6 +54,7 @@
     @Override
     public void configure(JobConf job) {
         super.configure(job);
+        SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
         PigMapReduce.sJobConf = job;
         try {
             mp = (PhysicalPlan) ObjectSerializer.deserialize(job

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=678959&r1=678958&r2=678959&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Tue Jul 22 17:50:20 2008
@@ -38,12 +38,14 @@
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
 
 /**
  * This class is the static Mapper & Reducer classes that
@@ -104,6 +106,7 @@
         @Override
         public void configure(JobConf jConf) {
             super.configure(jConf);
+            
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
             sJobConf = jConf;
             try {
                 rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java?rev=678959&r1=678958&r2=678959&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java Tue 
Jul 22 17:50:20 2008
@@ -303,7 +303,7 @@
         // pointer in the tuple.
         switch (DataType.findType(o)) {
             case DataType.BYTEARRAY: {
-                byte[] bytes = (byte[])o;
+                byte[] bytes = ((DataByteArray)o).get();
                 return bytes.length + 12;
             }
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java?rev=678959&r1=678958&r2=678959&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java
 Tue Jul 22 17:50:20 2008
@@ -32,8 +32,29 @@
      */
     
     public static void validatePigProperties(Properties properties) {
-        ensureLongType(properties, "pig.spill.size.threshold", 0L) ;
-        ensureLongType(properties, "pig.spill.gc.activation.size", 
Long.MAX_VALUE) ;   
+        ensureLongType(properties, "pig.spill.size.threshold", 5000000L) ;
+        ensureLongType(properties, "pig.spill.gc.activation.size", 40000000L) 
;   
+    }
+    
+    /**
+     * Validate properties which need to be validated and return *ONLY* those
+     * @param properties The Properties object containing all PIG properties
+     * @return The properties object containing *ONLY* properties which were 
validated
+     * (Typically these are user editable properties and should match the 
properties
+     * validated in ValidatePigProperties(Properties properties))
+     */
+    public static Properties getValidatedProperties(Properties properties) {
+        Properties result = new Properties();
+        String[] propertiesToValidate = { "pig.spill.size.threshold", 
"pig.spill.gc.activation.size" };
+        
+        // validate the incoming properties
+        validatePigProperties(properties);
+        
+        // return only properties that we validated
+        for (String p : propertiesToValidate) {
+            result.setProperty(p, properties.getProperty(p));
+        }        
+        return result;        
     }
     
     private static void ensureLongType(Properties properties,

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=678959&r1=678958&r2=678959&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java
 Tue Jul 22 17:50:20 2008
@@ -27,6 +27,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Properties;
 
 import javax.management.Notification;
 import javax.management.NotificationEmitter;
@@ -51,6 +52,26 @@
     
     LinkedList<WeakReference<Spillable>> spillables = new 
LinkedList<WeakReference<Spillable>>();
     
+    // if we freed at least this much, invoke GC 
+    // (default 40 MB - this can be overridden by user supplied property)
+    private static long gcActivationSize = 40000000L ;
+    
+    // spill file size should be at least this much
+    // (default 5MB - this can be overridden by user supplied property)
+    private static long spillFileSizeThreshold = 5000000L ;
+    
+    // this will keep track of memory freed across spills
+    // and between GC invocations
+    private static long accumulatedFreeSize = 0L;
+    
+    // fraction of biggest heap for which we want to get
+    // "memory usage threshold exceeded" notifications
+    private static double memoryThresholdFraction = 0.7;
+    
+    // fraction of biggest heap for which we want to get
+    // "collection threshold exceeded" notifications
+    private static double collectionMemoryThresholdFraction = 0.5;
+        
     public SpillableMemoryManager() {
         
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this,
 null, null);
         List<MemoryPoolMXBean> mpbeans = 
ManagementFactory.getMemoryPoolMXBeans();
@@ -75,16 +96,57 @@
         }
         log.debug("Selected heap to monitor (" +
             biggestHeap.getName() + ")");
+        
+        // we want to set both collection and usage threshold alerts to be 
+        // safe. In some local tests after a point only collection threshold
+        // notifications were being sent though usage threshold notifications
+        // were sent early on. So using both would ensure that
+        // 1) we get notified early (though usage threshold exceeded 
notifications)
+        // 2) we get notified always when threshold is exceeded (either usage 
or
+        //    collection)
+        
         /* We set the threshold to be 50% of tenured since that is where
          * the GC starts to dominate CPU time according to Sun doc */
-        biggestHeap.setCollectionUsageThreshold((long)(biggestSize*.5));    
+        biggestHeap.setCollectionUsageThreshold((long)(biggestSize * 
collectionMemoryThresholdFraction));
+        // we set a higher threshold for usage threshold exceeded notification
+        // since this is more likely to be effective sooner and we do not
+        // want to be spilling too soon
+        biggestHeap.setUsageThreshold((long)(biggestSize * 
memoryThresholdFraction));
+    }
+    
+    public static void configure(Properties properties) {
+        
+        try {
+            
+            spillFileSizeThreshold = Long.parseLong(
+                        (String) 
properties.getProperty("pig.spill.size.threshold") ) ;
+            
+            gcActivationSize = Long.parseLong(
+                    (String) 
properties.getProperty("pig.spill.gc.activation.size") ) ;
+        } 
+        catch (NumberFormatException  nfe) {
+            throw new RuntimeException("Error while converting system 
configurations" +
+                       "spill.size.threshold, spill.gc.activation.size", nfe) ;
+        }
     }
     
     public void handleNotification(Notification n, Object o) {
         CompositeData cd = (CompositeData) n.getUserData();
         MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
-        log.info("low memory handler called " + info.getUsage());
-        long toFree = info.getUsage().getUsed() - 
(long)(info.getUsage().getMax()*.5);
+        // free the amount exceeded over the threshold and then a further half
+        // so if threshold = heapmax/2, we will be trying to free
+        // used - heapmax/2 + heapmax/4
+        long toFree = 0L;
+        
if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
+            long threshold = (long)(info.getUsage().getMax() * 
memoryThresholdFraction);
+            toFree = info.getUsage().getUsed() - threshold + (long)(threshold 
* 0.5);
+            log.info("low memory handler called (Usage threshold exceeded) " + 
info.getUsage());
+        } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
+            long threshold = (long)(info.getUsage().getMax() * 
collectionMemoryThresholdFraction);
+            toFree = info.getUsage().getUsed() - threshold + (long)(threshold 
* 0.5);
+            log.info("low memory handler called (Collection threshold 
exceeded) " + info.getUsage());
+        }
+         
         if (toFree < 0) {
             log.debug("low memory handler returning " + 
                 "because there is nothing to free");
@@ -105,6 +167,8 @@
                 /**
                  * We don't lock anything, so this sort may not be stable if a 
WeakReference suddenly
                  * becomes null, but it will be close enough.
+                 * Also between the time we sort and we use these spillables, 
they
+                 * may actually change in size - so this is just best effort
                  */    
                 public int compare(WeakReference<Spillable> o1Ref, 
WeakReference<Spillable> o2Ref) {
                     Spillable o1 = o1Ref.get();
@@ -113,10 +177,10 @@
                         return 0;
                     }
                     if (o1 == null) {
-                        return -1;
+                        return 1;
                     }
                     if (o2 == null) {
-                        return 1;
+                        return -1;
                     }
                     long o1Size = o1.getMemorySize();
                     long o2Size = o2.getMemorySize();
@@ -125,12 +189,13 @@
                         return 0;
                     }
                     if (o1Size < o2Size) {
-                        return -1;
+                        return 1;
                     }
-                    return 1;
+                    return -1;
                 }
             });
             long estimatedFreed = 0;
+            boolean invokeGC = false;
             for (i = spillables.iterator(); i.hasNext();) {
                 Spillable s = i.next().get();
                 // Still need to check for null here, even after we removed
@@ -141,14 +206,34 @@
                     continue;
                 }
                 long toBeFreed = s.getMemorySize();
-                s.spill();
+                log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold 
= "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
+                // Don't keep trying if the rest of files are too small
+                if (toBeFreed < spillFileSizeThreshold) {
+                    log.debug("spilling small files - getting out of memory 
handler");
+                    break ;
+                }
+                s.spill();               
                 estimatedFreed += toBeFreed;
+                accumulatedFreeSize += toBeFreed;
+                // This should significantly reduce the number of small files
+                // in case that we have a lot of nested bags
+                if (accumulatedFreeSize > gcActivationSize) {
+                    invokeGC = true;
+                }
+                
                 if (estimatedFreed > toFree) {
+                    log.debug("Freed enough space - getting out of memory 
handler");
+                    invokeGC = true;
                     break;
                 }
             }
+
             /* Poke the GC again to see if we successfully freed enough memory 
*/
-            System.gc();
+            if(invokeGC) {
+                System.gc();
+                // now that we have invoked the GC, reset accumulatedFreeSize
+                accumulatedFreeSize = 0;
+            }
         }
     }
     


Reply via email to