Author: olga
Date: Sat May  3 14:49:42 2008
New Revision: 653148

URL: http://svn.apache.org/viewvc?rev=653148&view=rev
Log:
PIG-229: proper error handling for invlaid streaming deserializer

Modified:
    incubator/pig/trunk/CHANGES.txt
    
incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
    
incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=653148&r1=653147&r2=653148&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Sat May  3 14:49:42 2008
@@ -273,3 +273,4 @@
 
     PIG-215: Cleanup a few dangling ends left by PIG-111 (pi_song via gates).
     
+    PIG-229: Proper error handling in case of deserializer failure

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=653148&r1=653147&r2=653148&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
 Sat May  3 14:49:42 2008
@@ -160,4 +160,9 @@
         sb.append(carray, 0, cbuff.position());
         return sb.toString();
     }
+
+    public void close() throws IOException {
+        super.close();
+        in.close();
+    }
 }

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java?rev=653148&r1=653147&r2=653148&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java 
(original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java 
Sat May  3 14:49:42 2008
@@ -17,9 +17,12 @@
  */
 package org.apache.pig.impl.streaming;
 
+import java.io.IOException;
+
 import org.apache.pig.LoadFunc;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 
 /**
@@ -28,6 +31,7 @@
  * via its <code>stdout</code>.
  */
 public class DefaultOutputHandler extends OutputHandler {
+    BufferedPositionedInputStream stdout;
     
     public DefaultOutputHandler() {
         deserializer = new PigStorage();
@@ -40,4 +44,16 @@
     public OutputType getOutputType() {
         return OutputType.SYNCHRONOUS;
     }
+
+    public void bindTo(String fileName, BufferedPositionedInputStream is,
+            long offset, long end) throws IOException {
+        stdout = is;
+        super.bindTo(fileName, stdout, offset, end);
+    }
+
+    public void close() throws IOException {
+        super.close();
+        stdout.close();
+        stdout = null;
+    }
 }

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=653148&r1=653147&r2=653148&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java 
(original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java 
Sat May  3 14:49:42 2008
@@ -87,6 +87,8 @@
        protected long outputRecords = 0;
        protected long outputBytes = 0;
        
+       protected volatile Throwable outerrThreadsError;
+
        /**
         * Create a new [EMAIL PROTECTED] ExecutableManager}.
         */
@@ -202,6 +204,13 @@
                    stderrThread.interrupt();
                }
            }
+           
+           // Check if there was a problem with the managed process
+           if (outerrThreadsError != null) {
+               throw new IOException("Output/Error thread failed with: " +
+                                     outerrThreadsError);
+           }
+ 
        }
 
        /**
@@ -304,6 +313,12 @@
         * @throws IOException
         */
        public void add(Datum d) throws IOException {
+        // Check if there was a problem with the managed process
+           if (outerrThreadsError != null) {
+               throw new IOException("Output/Error thread failed with: " +
+                                     outerrThreadsError);
+           }
+           
                // Pass the serialized tuple to the executable via the 
InputHandler
            Tuple t = (Tuple)d;
            inputHandler.putNext(t);
@@ -344,6 +359,9 @@
 
                                outputHandler.close();
                        } catch (Throwable t) {
+                           // Note that an error occurred 
+                           outerrThreadsError = t;
+
                                LOG.warn(t);
                                try {
                                    outputHandler.close();
@@ -387,16 +405,19 @@
                                        stderr.close();
                                        LOG.debug("ProcessErrorThread done");
                                }
-                       } catch (Throwable th) {
-                               LOG.warn(th);
+                       } catch (Throwable t) {
+                           // Note that an error occurred 
+                           outerrThreadsError = t;
+
+                               LOG.warn(t);
                                try {
                                        if (stderr != null) {
                                                stderr.close();
                                        }
                                } catch (IOException ioe) {
                                        LOG.info(ioe);
-                       throw new RuntimeException(th);
                                }
+                throw new RuntimeException(t);
                        }
                }
        }


Reply via email to