Author: olga Date: Tue Jul 22 17:50:20 2008 New Revision: 678959 URL: http://svn.apache.org/viewvc?rev=678959&view=rev Log: merge of PIG-176-170-235-330
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=678959&r1=678958&r2=678959&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Tue Jul 22 17:50:20 2008 @@ -114,10 +114,6 @@ this(execType, PropertiesUtil.loadPropertiesFromFile()); } - public PigServer() throws ExecException { - this(ExecType.MAPREDUCE); - } - public PigServer(ExecType execType, Properties properties) throws ExecException { this.pigContext = new PigContext(execType, properties); if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) { Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=678959&r1=678958&r2=678959&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java Tue Jul 22 17:50:20 2008 @@ -3,6 +3,7 @@ import java.io.IOException; import java.io.PrintStream; import java.util.List; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -12,12 +13,15 @@ import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.executionengine.ExecutionEngine; import org.apache.pig.impl.PigContext; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.ConfigurationValidator; public class LocalLauncher extends Launcher{ @@ -35,7 +39,9 @@ MRCompiler comp = new MRCompiler(php, pc); comp.compile(); - Configuration conf = new Configuration(); + ExecutionEngine exe = pc.getExecutionEngine(); + Properties validatedProperties = ConfigurationValidator.getValidatedProperties(exe.getConfiguration()); + Configuration conf = ConfigurationUtil.toConfiguration(validatedProperties); conf.set("mapred.job.tracker", "local"); JobClient jobClient = new JobClient(new JobConf(conf)); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=678959&r1=678958&r2=678959&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Jul 22 17:50:20 2008 @@ -22,6 +22,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.ConfigurationValidator; /** * Main class that launches pig for Map Reduce @@ -42,6 +43,7 @@ comp.compile(); ExecutionEngine exe = pc.getExecutionEngine(); + ConfigurationValidator.validatePigProperties(exe.getConfiguration()); Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration()); JobClient jobClient = ((HExecutionEngine)exe).getJobClient(); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=678959&r1=678958&r2=678959&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Tue Jul 22 17:50:20 2008 @@ -17,12 +17,14 @@ import org.apache.pig.data.TargetedTuple; import org.apache.pig.data.Tuple; import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.SpillableMemoryManager; public abstract class PigMapBase extends MapReduceBase{ private final Log log = LogFactory.getLog(getClass()); @@ -52,6 +54,7 @@ @Override public void configure(JobConf job) { super.configure(job); + SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job)); PigMapReduce.sJobConf = job; try { mp = (PhysicalPlan) ObjectSerializer.deserialize(job Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=678959&r1=678958&r2=678959&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Tue Jul 22 17:50:20 2008 @@ -38,12 +38,14 @@ import org.apache.pig.data.IndexedTuple; import org.apache.pig.data.TargetedTuple; import org.apache.pig.data.Tuple; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.SpillableMemoryManager; /** * This class is the static Mapper & Reducer classes that @@ -104,6 +106,7 @@ @Override public void configure(JobConf jConf) { super.configure(jConf); + SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf)); sJobConf = jConf; try { rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java?rev=678959&r1=678958&r2=678959&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java Tue Jul 22 17:50:20 2008 @@ -303,7 +303,7 @@ // pointer in the tuple. switch (DataType.findType(o)) { case DataType.BYTEARRAY: { - byte[] bytes = (byte[])o; + byte[] bytes = ((DataByteArray)o).get(); return bytes.length + 12; } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java?rev=678959&r1=678958&r2=678959&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java Tue Jul 22 17:50:20 2008 @@ -32,8 +32,29 @@ */ public static void validatePigProperties(Properties properties) { - ensureLongType(properties, "pig.spill.size.threshold", 0L) ; - ensureLongType(properties, "pig.spill.gc.activation.size", Long.MAX_VALUE) ; + ensureLongType(properties, "pig.spill.size.threshold", 5000000L) ; + ensureLongType(properties, "pig.spill.gc.activation.size", 40000000L) ; + } + + /** + * Validate properties which need to be validated and return *ONLY* those + * @param properties The Properties object containing all PIG properties + * @return The properties object containing *ONLY* properties which were validated + * (Typically these are user editable properties and should match the properties + * validated in ValidatePigProperties(Properties properties)) + */ + public static Properties getValidatedProperties(Properties properties) { + Properties result = new Properties(); + String[] propertiesToValidate = { "pig.spill.size.threshold", "pig.spill.gc.activation.size" }; + + // validate the incoming properties + validatePigProperties(properties); + + // return only properties that we validated + for (String p : propertiesToValidate) { + result.setProperty(p, properties.getProperty(p)); + } + return result; } private static void ensureLongType(Properties properties, Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=678959&r1=678958&r2=678959&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java Tue Jul 22 17:50:20 2008 @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Properties; import javax.management.Notification; import javax.management.NotificationEmitter; @@ -51,6 +52,26 @@ LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>(); + // if we freed at least this much, invoke GC + // (default 40 MB - this can be overridden by user supplied property) + private static long gcActivationSize = 40000000L ; + + // spill file size should be at least this much + // (default 5MB - this can be overridden by user supplied property) + private static long spillFileSizeThreshold = 5000000L ; + + // this will keep track of memory freed across spills + // and between GC invocations + private static long accumulatedFreeSize = 0L; + + // fraction of biggest heap for which we want to get + // "memory usage threshold exceeded" notifications + private static double memoryThresholdFraction = 0.7; + + // fraction of biggest heap for which we want to get + // "collection threshold exceeded" notifications + private static double collectionMemoryThresholdFraction = 0.5; + public SpillableMemoryManager() { ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null); List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans(); @@ -75,16 +96,57 @@ } log.debug("Selected heap to monitor (" + biggestHeap.getName() + ")"); + + // we want to set both collection and usage threshold alerts to be + // safe. In some local tests after a point only collection threshold + // notifications were being sent though usage threshold notifications + // were sent early on. So using both would ensure that + // 1) we get notified early (though usage threshold exceeded notifications) + // 2) we get notified always when threshold is exceeded (either usage or + // collection) + /* We set the threshold to be 50% of tenured since that is where * the GC starts to dominate CPU time according to Sun doc */ - biggestHeap.setCollectionUsageThreshold((long)(biggestSize*.5)); + biggestHeap.setCollectionUsageThreshold((long)(biggestSize * collectionMemoryThresholdFraction)); + // we set a higher threshold for usage threshold exceeded notification + // since this is more likely to be effective sooner and we do not + // want to be spilling too soon + biggestHeap.setUsageThreshold((long)(biggestSize * memoryThresholdFraction)); + } + + public static void configure(Properties properties) { + + try { + + spillFileSizeThreshold = Long.parseLong( + (String) properties.getProperty("pig.spill.size.threshold") ) ; + + gcActivationSize = Long.parseLong( + (String) properties.getProperty("pig.spill.gc.activation.size") ) ; + } + catch (NumberFormatException nfe) { + throw new RuntimeException("Error while converting system configurations" + + "spill.size.threshold, spill.gc.activation.size", nfe) ; + } } public void handleNotification(Notification n, Object o) { CompositeData cd = (CompositeData) n.getUserData(); MemoryNotificationInfo info = MemoryNotificationInfo.from(cd); - log.info("low memory handler called " + info.getUsage()); - long toFree = info.getUsage().getUsed() - (long)(info.getUsage().getMax()*.5); + // free the amount exceeded over the threshold and then a further half + // so if threshold = heapmax/2, we will be trying to free + // used - heapmax/2 + heapmax/4 + long toFree = 0L; + if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) { + long threshold = (long)(info.getUsage().getMax() * memoryThresholdFraction); + toFree = info.getUsage().getUsed() - threshold + (long)(threshold * 0.5); + log.info("low memory handler called (Usage threshold exceeded) " + info.getUsage()); + } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE + long threshold = (long)(info.getUsage().getMax() * collectionMemoryThresholdFraction); + toFree = info.getUsage().getUsed() - threshold + (long)(threshold * 0.5); + log.info("low memory handler called (Collection threshold exceeded) " + info.getUsage()); + } + if (toFree < 0) { log.debug("low memory handler returning " + "because there is nothing to free"); @@ -105,6 +167,8 @@ /** * We don't lock anything, so this sort may not be stable if a WeakReference suddenly * becomes null, but it will be close enough. + * Also between the time we sort and we use these spillables, they + * may actually change in size - so this is just best effort */ public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) { Spillable o1 = o1Ref.get(); @@ -113,10 +177,10 @@ return 0; } if (o1 == null) { - return -1; + return 1; } if (o2 == null) { - return 1; + return -1; } long o1Size = o1.getMemorySize(); long o2Size = o2.getMemorySize(); @@ -125,12 +189,13 @@ return 0; } if (o1Size < o2Size) { - return -1; + return 1; } - return 1; + return -1; } }); long estimatedFreed = 0; + boolean invokeGC = false; for (i = spillables.iterator(); i.hasNext();) { Spillable s = i.next().get(); // Still need to check for null here, even after we removed @@ -141,14 +206,34 @@ continue; } long toBeFreed = s.getMemorySize(); - s.spill(); + log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize); + // Don't keep trying if the rest of files are too small + if (toBeFreed < spillFileSizeThreshold) { + log.debug("spilling small files - getting out of memory handler"); + break ; + } + s.spill(); estimatedFreed += toBeFreed; + accumulatedFreeSize += toBeFreed; + // This should significantly reduce the number of small files + // in case that we have a lot of nested bags + if (accumulatedFreeSize > gcActivationSize) { + invokeGC = true; + } + if (estimatedFreed > toFree) { + log.debug("Freed enough space - getting out of memory handler"); + invokeGC = true; break; } } + /* Poke the GC again to see if we successfully freed enough memory */ - System.gc(); + if(invokeGC) { + System.gc(); + // now that we have invoked the GC, reset accumulatedFreeSize + accumulatedFreeSize = 0; + } } }