Author: gates Date: Fri May 2 13:57:56 2008 New Revision: 652906 URL: http://svn.apache.org/viewvc?rev=652906&view=rev Log: PIG-176: Change bag spilling so that bags below a certain threshold are not spilled, thus avoiding proliferation of small files.
Added: incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/conf/pig.properties incubator/pig/trunk/src/org/apache/pig/PigServer.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=652906&r1=652905&r2=652906&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Fri May 2 13:57:56 2008 @@ -264,3 +264,6 @@ PIG-224: fix to error handling code to produce correct error code + PIG-176: Change bag spilling so that bags below a certain threshold are + not spilled, thus avoiding proliferation of small files (pi_song via + gates). Modified: incubator/pig/trunk/conf/pig.properties URL: http://svn.apache.org/viewvc/incubator/pig/trunk/conf/pig.properties?rev=652906&r1=652905&r2=652906&view=diff ============================================================================== --- incubator/pig/trunk/conf/pig.properties (original) +++ incubator/pig/trunk/conf/pig.properties Fri May 2 13:57:56 2008 @@ -32,3 +32,11 @@ #hod.config.dir #hod.param + +#Do not spill temp files smaller than this size (bytes) +#pig.spill.size.threshold=100000 + +#EXPERIMENT: Activate garbage collection when spilling a file bigger than this size (bytes) +#This should help reduce the number of files being spilled. +#pig.spill.gc.activation.size=100000000 + Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=652906&r1=652905&r2=652906&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Fri May 2 13:57:56 2008 @@ -133,10 +133,6 @@ this(execType, PropertiesUtil.loadPropertiesFromFile()); } - public PigServer() throws ExecException { - this(ExecType.MAPREDUCE, new Properties()); - } - public PigServer(ExecType execType, Properties properties) throws ExecException { this.pigContext = new PigContext(execType, properties); if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) { Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=652906&r1=652905&r2=652906&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Fri May 2 13:57:56 2008 @@ -60,6 +60,7 @@ import org.apache.pig.impl.util.JarManager; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.WrappedIOException; +import org.apache.pig.impl.util.ConfigurationValidator; /** @@ -133,6 +134,7 @@ JobConf conf = new JobConf(config); setJobProperties(conf, pom); Properties properties = pom.pigContext.getProperties(); + ConfigurationValidator.validatePigProperties(properties) ; String jobName = properties.getProperty(PigContext.JOB_NAME); conf.setJobName(jobName); boolean success = false; @@ -160,6 +162,11 @@ String user = System.getProperty("user.name"); conf.setUser(user != null ? user : "Pigster"); + conf.set("pig.spill.size.threshold", + properties.getProperty("pig.spill.size.threshold")) ; + conf.set("pig.spill.gc.activation.size", + properties.getProperty("pig.spill.gc.activation.size")) ; + if (pom.reduceParallelism != -1) { conf.setNumReduceTasks(pom.reduceParallelism); } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=652906&r1=652905&r2=652906&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Fri May 2 13:57:56 2008 @@ -51,6 +51,7 @@ import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.logicalLayer.LOCogroup; import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.SpillableMemoryManager; /** @@ -193,6 +194,12 @@ jobConf.get("mapred.task.id")); properties.setProperty("pig.streaming.task.output.dir", jobConf.getOutputPath().toString()); + properties.setProperty("pig.spill.size.threshold", + jobConf.get("pig.spill.size.threshold")); + properties.setProperty("pig.spill.gc.activation.size", + jobConf.get("pig.spill.gc.activation.size")); + + SpillableMemoryManager.configure(properties) ; } /** Added: incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java?rev=652906&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java Fri May 2 13:57:56 2008 @@ -0,0 +1,38 @@ +package org.apache.pig.impl.util; + +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class ConfigurationValidator { + + private final static Log log = LogFactory.getLog(PropertiesUtil.class); + /*** + * All pig configurations should be validated in here before use + * @param properties + */ + + public static void validatePigProperties(Properties properties) { + ensureLongType(properties, "pig.spill.size.threshold", 0L) ; + ensureLongType(properties, "pig.spill.gc.activation.size", Long.MAX_VALUE) ; + } + + private static void ensureLongType(Properties properties, + String key, + long defaultValue) { + String str = properties.getProperty(key) ; + if (str != null) { + try { + Long.parseLong(str) ; + } + catch (NumberFormatException nfe) { + log.error(str + " has to be parsable to long") ; + properties.setProperty(key, Long.toString(defaultValue)) ; + } + } + else { + properties.setProperty(key, Long.toString(defaultValue)) ; + } + } +} Modified: incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=652906&r1=652905&r2=652906&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java Fri May 2 13:57:56 2008 @@ -55,6 +55,9 @@ properties.put(entry.getKey(), entry.getValue()); } } + + // For telling error fast when there are problems + ConfigurationValidator.validatePigProperties(properties) ; } public static Properties loadPropertiesFromFile() { @@ -62,4 +65,5 @@ loadPropertiesFromFile(properties); return properties; } + } Modified: incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=652906&r1=652905&r2=652906&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Fri May 2 13:57:56 2008 @@ -10,6 +10,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Properties; import javax.management.Notification; import javax.management.NotificationEmitter; @@ -34,6 +35,9 @@ LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>(); + private static long gcActivationSize = Long.MAX_VALUE ; + private static long spillFileSizeThreshold = 0L ; + public SpillableMemoryManager() { ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null); List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans(); @@ -63,6 +67,21 @@ biggestHeap.setCollectionUsageThreshold((long)(biggestSize*.5)); } + public static void configure(Properties properties) { + + try { + spillFileSizeThreshold = Long.parseLong( + (String) properties.getProperty("pig.spill.size.threshold") ) ; + gcActivationSize = Long.parseLong( + (String) properties.getProperty("pig.spill.gc.activation.size") ) ; + } + catch (NumberFormatException nfe) { + throw new RuntimeException("Error while converting system configurations" + + "spill.size.threshold, spill.gc.activation.size", nfe) ; + } + + } + public void handleNotification(Notification n, Object o) { CompositeData cd = (CompositeData) n.getUserData(); MemoryNotificationInfo info = MemoryNotificationInfo.from(cd); @@ -124,8 +143,21 @@ continue; } long toBeFreed = s.getMemorySize(); - s.spill(); + // Don't keep trying if the rest of files are too small + if (toBeFreed < spillFileSizeThreshold) { + break ; + } + + s.spill(); + + // This should significantly reduce the number of small files + // in case that we have a lot of nested bags + if (toBeFreed > gcActivationSize) { + System.gc(); + } + estimatedFreed += toBeFreed; + if (estimatedFreed > toFree) { break; }