Author: rding
Date: Mon Aug 30 17:40:11 2010
New Revision: 990877

URL: http://svn.apache.org/viewvc?rev=990877&view=rev
Log:
PIG-1343: pig_log file missing even though Main tells it is creating one and an 
M/R job fails

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Aug 30 17:40:11 2010
@@ -171,6 +171,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1343: pig_log file missing even though Main tells it is creating one and
+an M/R job fails (nrai via rding)
+
 PIG-1482: Pig gets confused when more than one loader is involved (xuefuz via 
thejas)
 
 PIG-1579: Intermittent unit test failure for 
TestScriptUDF.testPythonScriptUDFNullInputOutput (daijy)

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Mon Aug 30 17:40:11 2010
@@ -103,10 +103,12 @@ import org.apache.pig.pen.ExampleGenerat
 import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.tools.grunt.GruntParser;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
+import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 
 
 /**
@@ -323,13 +325,20 @@ public class PigServer {
     public List<ExecJob> executeBatch() throws FrontendException, 
ExecException {
         PigStats stats = executeBatchEx();
         LinkedList<ExecJob> jobs = new LinkedList<ExecJob>();
-        for (OutputStats output : stats.getOutputStats()) {
-            if (output.isSuccessful()) {
-                jobs.add(new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, output
-                        .getPOStore(), output.getAlias(), stats));
-            } else {
-                jobs.add(new HJob(HJob.JOB_STATUS.FAILED, pigContext, output
-                        .getPOStore(), output.getAlias(), stats));
+        JobGraph jGraph = stats.getJobGraph();
+        Iterator<JobStats> iter = jGraph.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();
+            for (OutputStats output : js.getOutputs()) {
+                if (js.isSuccessful()) {                
+                    jobs.add(new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, 
output
+                            .getPOStore(), output.getAlias(), stats));
+                } else {
+                    HJob hjob = new HJob(HJob.JOB_STATUS.FAILED, pigContext, 
output
+                            .getPOStore(), output.getAlias(), stats);
+                    hjob.setException(js.getException());
+                    jobs.add(hjob);
+                }
             }
         }
         return jobs;
@@ -338,7 +347,7 @@ public class PigServer {
     private PigStats executeBatchEx() throws FrontendException, ExecException {
         if (!isMultiQuery) {
             // ignore if multiquery is off
-            return PigStatsUtil.getEmptyPigStats();
+            return PigStats.get();
         }
 
         if (currDAG == null || !isBatchOn()) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
 Mon Aug 30 17:40:11 2010
@@ -178,47 +178,37 @@ public abstract class Launcher {
        for (int i = 0; i < reports.length; i++) {
             String msgs[] = reports[i].getDiagnostics();
             ArrayList<Exception> exceptions = new ArrayList<Exception>();
-            boolean jobFailed = false;
-            float successfulProgress = 1.0f;
-            if (msgs.length > 0) {
-               //if the progress reported is not 1.0f then the map or reduce 
job failed
-               //this comparison is in place till Hadoop 0.20 provides methods 
to query
-               //job status                    
-               if(reports[i].getProgress() != successfulProgress) {
-                    jobFailed = true;
-               }
-                Set<String> errorMessageSet = new HashSet<String>();
-                for (int j = 0; j < msgs.length; j++) {                        
-                    if(!errorMessageSet.contains(msgs[j])) {
-                        errorMessageSet.add(msgs[j]);
-                        if (errNotDbg) {
-                            //errNotDbg is used only for failed jobs
-                            //keep track of all the unique exceptions
-                            try {
-                                LogUtils.writeLog("Backend error message", 
msgs[j], 
-                                        
pigContext.getProperties().getProperty("pig.logfile"),
-                                        log);
-                                Exception e = getExceptionFromString(msgs[j]);
-                                exceptions.add(e);
-                            } catch (Exception e1) {
-                                String firstLine = 
getFirstLineFromMessage(msgs[j]);                                
-                                int errCode = 2997;
-                                String msg = "Unable to recreate exception 
from backed error: " + firstLine;
-                                throw new ExecException(msg, errCode, 
PigException.BUG);
-                            }
-                        } else {
-                            log.debug("Error message from task (" + type + ") 
" +
-                                      reports[i].getTaskID() + msgs[j]);
+            Set<String> errorMessageSet = new HashSet<String>();
+            for (int j = 0; j < msgs.length; j++) {                    
+                if(!errorMessageSet.contains(msgs[j])) {
+                    errorMessageSet.add(msgs[j]);
+                    if (errNotDbg) {
+                        //errNotDbg is used only for failed jobs
+                        //keep track of all the unique exceptions
+                        try {
+                            LogUtils.writeLog("Backend error message", 
msgs[j], 
+                                    
pigContext.getProperties().getProperty("pig.logfile"),
+                                    log);
+                            Exception e = getExceptionFromString(msgs[j]);
+                            exceptions.add(e);
+                        } catch (Exception e1) {
+                            String firstLine = 
getFirstLineFromMessage(msgs[j]);                                
+                            int errCode = 2997;
+                            String msg = "Unable to recreate exception from 
backed error: " + firstLine;
+                            throw new ExecException(msg, errCode, 
PigException.BUG);
                         }
+                    } else {
+                        log.debug("Error message from task (" + type + ") " +
+                                  reports[i].getTaskID() + msgs[j]);
                     }
                 }
-            }
+            }            
             
             //if its a failed job then check if there is more than one 
exception
             //more than one exception implies possibly different kinds of 
failures
             //log all the different failures and throw the exception 
corresponding
             //to the first failure
-            if(jobFailed) {
+            if (errNotDbg) {
                 if(exceptions.size() > 1) {
                     for(int j = 0; j < exceptions.size(); ++j) {
                         String headerMessage = "Error message from task (" + 
type + ") " + reports[i].getTaskID();
@@ -233,7 +223,7 @@ public abstract class Launcher {
                        throw new ExecException(msg, errCode, PigException.BUG);
                 }
             }
-        }
+        }        
     }
     
     /**

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Mon Aug 30 
17:40:11 2010
@@ -56,6 +56,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
@@ -112,16 +113,16 @@ public class GruntParser extends PigScri
                 for(ExecJob job: jobs) {
                     if (job.getStatus() == ExecJob.JOB_STATUS.FAILED) {
                         mNumFailedJobs++;
-                        if (job.getException() != null) {
-                            LogUtils.writeLog(
-                              job.getException(), 
-                              
mPigServer.getPigContext().getProperties().getProperty("pig.logfile"), 
-                              log, 
-                              
"true".equalsIgnoreCase(mPigServer.getPigContext().getProperties().getProperty("verbose")),
-                              "Pig Stack Trace");
-                        }
-                    }
-                    else {
+                        Exception exp = (job.getException() != null) ? 
job.getException()
+                                : new ExecException(
+                                        "Job failed, hadoop does not return 
any error message",
+                                        2244);                        
+                        LogUtils.writeLog(exp, 
+                                
mPigServer.getPigContext().getProperties().getProperty("pig.logfile"), 
+                                log, 
+                                
"true".equalsIgnoreCase(mPigServer.getPigContext().getProperties().getProperty("verbose")),
+                                "Pig Stack Trace");
+                    } else {
                         mNumSucceededJobs++;
                     }
                 }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Mon Aug 30 
17:40:11 2010
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -267,6 +268,14 @@ public final class PigStats {
     }
     
     /**
+     * Returns the properties associated with the script
+     */
+    public Properties getPigProperties() {
+        if (pigContext == null) return null;
+        return pigContext.getProperties();
+    }
+    
+    /**
      * Returns the DAG of the MR jobs spawned by the script
      */
     public JobGraph getJobGraph() {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Mon Aug 30 
17:40:11 2010
@@ -43,6 +43,7 @@ import org.apache.pig.tools.pigstats.Pig
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -73,6 +74,57 @@ public class TestPigRunner {
         cluster.shutDown();
     }
 
+    @Before
+    public void setUp() {
+        deleteAll(new File(OUTPUT_FILE));
+    }    
+    
+    @Test
+    public void testErrorLogFile() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, 
a2:int);");
+        w.println("B = foreach A generate StringSize(a0);");
+        w.println("store B into '" + OUTPUT_FILE + "';");
+        w.close();
+        
+        try {
+            String[] args = { "-x", "local", PIG_FILE };
+            PigStats stats = PigRunner.run(args, null);
+     
+            assertTrue(!stats.isSuccessful());
+ 
+            Properties props = stats.getPigProperties();
+            String logfile = props.getProperty("pig.logfile");
+            File f = new File(logfile);
+            assertTrue(f.exists());            
+        } finally {
+            new File(PIG_FILE).delete(); 
+        }
+    }
+    
+    @Test
+    public void testErrorLogFile2() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, 
a2:int);");
+        w.println("B = foreach A generate StringSize(a0);");
+        w.println("store B into '" + OUTPUT_FILE + "';");
+        w.close();
+        
+        try {
+            String[] args = { "-M", "-x", "local", PIG_FILE };
+            PigStats stats = PigRunner.run(args, null);
+     
+            assertTrue(!stats.isSuccessful());
+ 
+            Properties props = stats.getPigProperties();
+            String logfile = props.getProperty("pig.logfile");
+            File f = new File(logfile);
+            assertTrue(f.exists());          
+        } finally {
+            new File(PIG_FILE).delete();
+        }
+    }
+    
     @Test
     public void simpleTest() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -461,4 +513,13 @@ public class TestPigRunner {
         
     }
 
+    private void deleteAll(File d) {
+        if (!d.exists()) return;
+        if (d.isDirectory()) {
+            for (File f : d.listFiles()) {
+                deleteAll(f);
+            }
+        } 
+        d.delete();        
+    }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java?rev=990877&r1=990876&r2=990877&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java Mon Aug 30 
17:40:11 2010
@@ -40,6 +40,7 @@ import org.apache.pig.impl.util.LogUtils
 import org.apache.pig.tools.pigscript.parser.ParseException;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestPigStats  {
@@ -73,29 +74,38 @@ public class TestPigStats  {
     
     @Test
     public void testPigStatsAlias() throws Exception {
-        PigServer pig = new PigServer(ExecType.LOCAL);
-        pig.registerQuery("A = load 'input' as (name, age, gpa);");
-        pig.registerQuery("B = group A by name;");
-        pig.registerQuery("C = foreach B generate group, COUNT(A);");
-        pig.registerQuery("D = order C by $1;");
-        pig.registerQuery("E = limit D 10;");
-        pig.registerQuery("store E into 'output';");
-        
-        LogicalPlan lp = getLogicalPlan(pig);
-        PhysicalPlan pp = pig.getPigContext().getExecutionEngine().compile(lp,
-                null);
-        MROperPlan mp = getMRPlan(pp, pig.getPigContext());
-        
-        assertEquals(3, mp.getKeys().size());
-        
-        MapReduceOper mro = mp.getRoots().get(0);
-        assertEquals("A,B,C", getAlias(mro));
-        
-        mro = mp.getSuccessors(mro).get(0);
-        assertEquals("D", getAlias(mro));
-         
-        mro = mp.getSuccessors(mro).get(0);
-        assertEquals("D", getAlias(mro));
+        try {
+            PigServer pig = new PigServer(ExecType.LOCAL);
+            pig.registerQuery("A = load 'input' as (name, age, gpa);");
+            pig.registerQuery("B = group A by name;");
+            pig.registerQuery("C = foreach B generate group, COUNT(A);");
+            pig.registerQuery("D = order C by $1;");
+            pig.registerQuery("E = limit D 10;");
+            pig.registerQuery("store E into 'alias_output';");
+            
+            LogicalPlan lp = getLogicalPlan(pig);
+            PhysicalPlan pp = 
pig.getPigContext().getExecutionEngine().compile(lp,
+                    null);
+            MROperPlan mp = getMRPlan(pp, pig.getPigContext());
+            
+            assertEquals(3, mp.getKeys().size());
+            
+            MapReduceOper mro = mp.getRoots().get(0);
+            assertEquals("A,B,C", getAlias(mro));
+            
+            mro = mp.getSuccessors(mro).get(0);
+            assertEquals("D", getAlias(mro));
+             
+            mro = mp.getSuccessors(mro).get(0);
+            assertEquals("D", getAlias(mro));
+        } finally {
+            File outputfile = new File("alias_output");
+            if (outputfile.exists()) {
+                // Hadoop Local mode creates a directory
+                // Hence we need to delete a directory recursively
+                deleteDirectory(outputfile);
+            }
+        }
     }
     
     private void deleteDirectory( File dir ) {


Reply via email to