Author: daijy
Date: Thu Apr  1 22:46:50 2010
New Revision: 930123

URL: http://svn.apache.org/viewvc?rev=930123&view=rev
Log:
PIG-1313: PigServer leaks memory over time

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=930123&r1=930122&r2=930123&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Apr  1 22:46:50 2010
@@ -37,6 +37,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1313: PigServer leaks memory over time (billgraham via daijy)
+
 Release 0.7.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=930123&r1=930122&r2=930123&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Thu Apr  1 22:46:50 2010
@@ -99,7 +99,9 @@ import org.apache.pig.tools.grunt.GruntP
  * 
  * This class is the program's connection to Pig. Typically a program will 
create a PigServer
  * instance. The programmer then registers queries using registerQuery() and
- * retrieves results using openIterator() or store().
+ * retrieves results using openIterator() or store(). After doing so, the
+ * shutdown() method should be called to free any resources used by the current
+ * PigServer instance. Not doing so could result in a memory leak.
  * 
  */
 public class PigServer {
@@ -752,7 +754,12 @@ public class PigServer {
         }
         return aliasPlans;
     }
-    
+
+    /**
+     * Reclaims resources used by this instance of PigServer. This method
+     * deletes all temporary files generated by the current thread while
+     * executing Pig commands.
+     */
     public void shutdown() {
         // clean-up activities
             // TODO: reclaim scope to free up resources. Currently
@@ -760,6 +767,8 @@ public class PigServer {
             // hence, for now, we won't call it.
         //
         // pigContext.getExecutionEngine().reclaimScope(this.scope);
+
+        FileLocalizer.deleteTempFiles();
     }
 
     public Set<String> getAliasKeySet() {

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=930123&r1=930122&r2=930123&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu Apr  1 
22:46:50 2010
@@ -315,7 +315,6 @@ public class FileLocalizer {
     static public InputStream open(String fileSpec, PigContext pigContext) 
throws IOException {
         fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
         if (!fileSpec.startsWith(LOCAL_PREFIX)) {
-            init(pigContext);
             ElementDescriptor elem = 
pigContext.getDfs().asElement(fullPath(fileSpec, pigContext));
             return openDFSFile(elem);
         }
@@ -323,7 +322,6 @@ public class FileLocalizer {
             fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
             //buffering because we only want buffered streams to be passed to 
load functions.
             /*return new BufferedInputStream(new FileInputStream(fileSpec));*/
-            init(pigContext);
             ElementDescriptor elem = 
pigContext.getLfs().asElement(fullPath(fileSpec, pigContext));
             return openLFSFile(elem);
         }
@@ -341,7 +339,6 @@ public class FileLocalizer {
      */
     static public SeekableInputStream open(String fileSpec, long offset, 
PigContext pigContext) throws IOException {
         
-        init(pigContext);
         fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
         
         ElementDescriptor elem;
@@ -378,7 +375,6 @@ public class FileLocalizer {
     static public OutputStream create(String fileSpec, boolean append, 
PigContext pigContext) throws IOException {
         fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
         if (!fileSpec.startsWith(LOCAL_PREFIX)) {
-            init(pigContext);
             ElementDescriptor elem = pigContext.getDfs().asElement(fileSpec);
             return elem.create();
         }
@@ -400,7 +396,6 @@ public class FileLocalizer {
     static public boolean delete(String fileSpec, PigContext pigContext) 
throws IOException{
         fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
         if (!fileSpec.startsWith(LOCAL_PREFIX)) {
-            init(pigContext);
             ElementDescriptor elem = pigContext.getDfs().asElement(fileSpec);
             elem.delete();
             return true;
@@ -420,54 +415,113 @@ public class FileLocalizer {
         }
     }
 
-    static Stack<ElementDescriptor> toDelete    = 
-        new Stack<ElementDescriptor>();
-    static Stack<ElementDescriptor> deleteOnFail    = 
-        new Stack<ElementDescriptor>();
     static Random      r           = new Random();
-    static ContainerDescriptor relativeRoot;
-    static boolean     initialized = false;
+
     /**
-     * @param initialized the initialized to set
+     * Thread local toDelete Stack to hold descriptors to be deleted upon 
calling
+     * deleteTempFiles. Use the toDelete() method to access this stack.
      */
-    public static void setInitialized(boolean initialized) {
-        FileLocalizer.initialized = initialized;
+    private static ThreadLocal<Stack<ElementDescriptor>> toDelete =
+        new ThreadLocal<Stack<ElementDescriptor>>() {
+
+        protected Stack<ElementDescriptor> initialValue() {
+            return new Stack<ElementDescriptor>();
+        }
+    };
+
+    /**
+     * Thread local deleteOnFail Stack to hold descriptors to be deleted upon
+     * calling triggerDeleteOnFail. Use the deleteOnFail() method to access 
this
+     * stack.
+     */
+    private static ThreadLocal<Stack<ElementDescriptor>> deleteOnFail =
+        new ThreadLocal<Stack<ElementDescriptor>>() {
+
+        protected Stack<ElementDescriptor> initialValue() {
+            return new Stack<ElementDescriptor>();
+        }
+    };
+
+    /**
+     * Thread local relativeRoot ContainerDescriptor. Do not access this object
+     * directly, since it's lazy initialized in the relativeRoot(PigContext)
+     * method, which should be used instead.
+     */
+    private static ThreadLocal<ContainerDescriptor> relativeRoot =
+        new ThreadLocal<ContainerDescriptor>() {
+    };
+
+    /**
+     * Convenience accessor method to the toDelete Stack bound to this thread.
+     * @return A Stack of ElementDescriptors that should be deleted.
+     */
+    private static Stack<ElementDescriptor> toDelete() {
+        return toDelete.get();
     }
 
-    static private void init(final PigContext pigContext) throws 
DataStorageException {
+    /**
+     * Convenience accessor method to the deleteOnFail Stack bound to this 
thread.
+     * @return A Stack of ElementDescriptors that should be deleted upon 
failure.
+     */
+    private static Stack<ElementDescriptor> deleteOnFail() {
+        return deleteOnFail.get();
+    }
+
+    /**
+     * This method is only used by test code to reset state.
+     * @param initialized
+     */
+    public static void setInitialized(boolean initialized) {
         if (!initialized) {
-            initialized = true;
-            relativeRoot = pigContext.getDfs().asContainer("/tmp/temp" + 
r.nextInt());
-            toDelete.push(relativeRoot);
+            relativeRoot.set(null);
         }
     }
 
+    /**
+     * Accessor method to get the root ContainerDescriptor used for temporary
+     * files bound to this thread. Calling this method lazy-initialized the
+     * relativeRoot object.
+     *
+     * @param pigContext
+     * @return
+     * @throws DataStorageException
+     */
+    private static synchronized ContainerDescriptor relativeRoot(final 
PigContext pigContext)
+            throws DataStorageException {
+
+        if (relativeRoot.get() == null) {
+            relativeRoot.set(pigContext.getDfs().asContainer("/tmp/temp" + 
r.nextInt()));
+            toDelete().push(relativeRoot.get());
+        }
+
+        return relativeRoot.get();
+    }
+
     public static void deleteTempFiles() {
-        while (!toDelete.empty()) {
+        while (!toDelete().empty()) {
             try {
-                ElementDescriptor elem = toDelete.pop();
+                ElementDescriptor elem = toDelete().pop();
                 elem.delete();
             } 
             catch (IOException e) {
                 log.error(e);
             }
         }
-        initialized = false;
+        setInitialized(false);
     }
 
     public static synchronized ElementDescriptor 
         getTemporaryPath(ElementDescriptor relative, 
                          PigContext pigContext) throws IOException {
-        init(pigContext);
         if (relative == null) {
-            relative = relativeRoot;
+            relative = relativeRoot(pigContext);
         }
-        if (!relativeRoot.exists()) {
-            relativeRoot.create();
+        if (!relativeRoot(pigContext).exists()) {
+            relativeRoot(pigContext).create();
         }
         ElementDescriptor elem= 
             pigContext.getDfs().asElement(relative.toString(), "tmp" + 
r.nextInt());
-        toDelete.push(elem);
+        toDelete().push(elem);
         return elem;
     }
 
@@ -584,14 +638,14 @@ public class FileLocalizer {
     
     public static void clearDeleteOnFail()
     {
-       deleteOnFail.clear();
+       deleteOnFail().clear();
     }
     public static void registerDeleteOnFail(String filename, PigContext 
pigContext) throws IOException
     {
        try {
                ElementDescriptor elem = 
pigContext.getDfs().asElement(filename);
-               if (!toDelete.contains(elem))
-                   deleteOnFail.push(elem);
+               if (!toDelete().contains(elem))
+                   deleteOnFail().push(elem);
        }
         catch (DataStorageException e) {
             log.warn("Unable to register output file to delete on failure: " + 
filename);
@@ -600,9 +654,9 @@ public class FileLocalizer {
     public static void triggerDeleteOnFail()
     {
        ElementDescriptor elem = null;
-       while (!deleteOnFail.empty()) {
+       while (!deleteOnFail().empty()) {
             try {
-                elem = deleteOnFail.pop();
+                elem = deleteOnFail().pop();
                 if (elem.exists())
                        elem.delete();
             } 


Reply via email to