Author: olga
Date: Wed Jul 16 16:33:43 2008
New Revision: 677466

URL: http://svn.apache.org/viewvc?rev=677466&view=rev
Log:
PIG-129 and PIG-164 merge

Modified:
    incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: 
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java?rev=677466&r1=677465&r2=677466&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java 
Wed Jul 16 16:33:43 2008
@@ -31,12 +31,17 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.util.Spillable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Default implementation of DataBag.  This is the an abstract class used as a
  * parent for all three of the types of data bags.
  */
 public abstract class DefaultAbstractBag implements DataBag {
+
+     private static final Log log = LogFactory.getLog(DataBag.class);
+
     // Container that holds the tuples. Actual object instantiated by
     // subclasses.
     protected Collection<Tuple> mContents;
@@ -302,6 +307,32 @@
             mSpillFiles = new ArrayList<File>(1);
         }
 
+        String tmpDirName= 
System.getProperties().getProperty("java.io.tmpdir") ;                
+        File tmpDir = new File(tmpDirName);
+  
+        // if the directory does not exist, create it.
+        if (!tmpDir.exists()){
+            log.info("Temporary directory doesn't exists. Trying to create: " 
+ tmpDir.getAbsolutePath());
+          // Create the directory and see if it was successful
+          if (tmpDir.mkdir()){
+            log.info("Successfully created temporary directory: " + 
tmpDir.getAbsolutePath());
+          } else {
+              // If execution reaches here, it means that we needed to create 
the directory but
+              // were not successful in doing so.
+              // 
+              // If this directory is created recently then we can simply 
+              // skip creation. This is to address a rare issue occuring in a 
cluster despite the
+              // the fact that spill() makes call to getSpillFile() in a 
synchronized 
+              // block. 
+              if (tmpDir.exists()) {
+                log.info("Temporary directory already exists: " + 
tmpDir.getAbsolutePath());
+              } else {
+                log.error("Unable to create temporary directory: " + 
tmpDir.getAbsolutePath());
+                throw new IOException("Unable to create temporary directory: " 
+ tmpDir.getAbsolutePath() );                  
+              }
+          }
+        }
+        
         File f = File.createTempFile("pigbag", null);
         f.deleteOnExit();
         mSpillFiles.add(f);

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=677466&r1=677465&r2=677466&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java
 Wed Jul 16 16:33:43 2008
@@ -49,7 +49,7 @@
     
     private final Log log = LogFactory.getLog(getClass());
     
-    List<WeakReference<Spillable>> spillables = new 
LinkedList<WeakReference<Spillable>>();
+    LinkedList<WeakReference<Spillable>> spillables = new 
LinkedList<WeakReference<Spillable>>();
     
     public SpillableMemoryManager() {
         
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this,
 null, null);
@@ -159,6 +159,13 @@
      */
     public void registerSpillable(Spillable s) {
         synchronized(spillables) {
+            // Cleaing the entire list is too expensive.  Just trim off the 
front while
+            // we can.
+            WeakReference<Spillable> first = spillables.peek();
+            while (first != null && first.get() == null) {
+                spillables.remove();
+                first = spillables.peek();
+            }
             spillables.add(new WeakReference<Spillable>(s));
         }
     }


Reply via email to