Author: olga Date: Wed Jul 16 16:33:43 2008 New Revision: 677466 URL: http://svn.apache.org/viewvc?rev=677466&view=rev Log: PIG-129 and PIG-164 merge
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java?rev=677466&r1=677465&r2=677466&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java Wed Jul 16 16:33:43 2008 @@ -31,12 +31,17 @@ import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.impl.physicalLayer.PhysicalOperator; import org.apache.pig.impl.util.Spillable; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * Default implementation of DataBag. This is the an abstract class used as a * parent for all three of the types of data bags. */ public abstract class DefaultAbstractBag implements DataBag { + + private static final Log log = LogFactory.getLog(DataBag.class); + // Container that holds the tuples. Actual object instantiated by // subclasses. protected Collection<Tuple> mContents; @@ -302,6 +307,32 @@ mSpillFiles = new ArrayList<File>(1); } + String tmpDirName= System.getProperties().getProperty("java.io.tmpdir") ; + File tmpDir = new File(tmpDirName); + + // if the directory does not exist, create it. + if (!tmpDir.exists()){ + log.info("Temporary directory doesn't exists. Trying to create: " + tmpDir.getAbsolutePath()); + // Create the directory and see if it was successful + if (tmpDir.mkdir()){ + log.info("Successfully created temporary directory: " + tmpDir.getAbsolutePath()); + } else { + // If execution reaches here, it means that we needed to create the directory but + // were not successful in doing so. + // + // If this directory is created recently then we can simply + // skip creation. This is to address a rare issue occuring in a cluster despite the + // the fact that spill() makes call to getSpillFile() in a synchronized + // block. + if (tmpDir.exists()) { + log.info("Temporary directory already exists: " + tmpDir.getAbsolutePath()); + } else { + log.error("Unable to create temporary directory: " + tmpDir.getAbsolutePath()); + throw new IOException("Unable to create temporary directory: " + tmpDir.getAbsolutePath() ); + } + } + } + File f = File.createTempFile("pigbag", null); f.deleteOnExit(); mSpillFiles.add(f); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=677466&r1=677465&r2=677466&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java Wed Jul 16 16:33:43 2008 @@ -49,7 +49,7 @@ private final Log log = LogFactory.getLog(getClass()); - List<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>(); + LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>(); public SpillableMemoryManager() { ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null); @@ -159,6 +159,13 @@ */ public void registerSpillable(Spillable s) { synchronized(spillables) { + // Cleaing the entire list is too expensive. Just trim off the front while + // we can. + WeakReference<Spillable> first = spillables.peek(); + while (first != null && first.get() == null) { + spillables.remove(); + first = spillables.peek(); + } spillables.add(new WeakReference<Spillable>(s)); } }