Author: rding
Date: Wed Jul 21 17:39:14 2010
New Revision: 966326

URL: http://svn.apache.org/viewvc?rev=966326&view=rev
Log:
PIG-1478: Add progress notification listener to PigRunner API

Added:
    
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/Main.java
    hadoop/pig/trunk/src/org/apache/pig/PigRunner.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Jul 21 17:39:14 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1478: Add progress notification listener to PigRunner API (rding)
+
 PIG-1472: Optimize serialization/deserialization between Map and Reduce and 
between MR jobs (thejas)
 
 PIG-1389: Implement Pig counter to track number of rows for each input files

Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Wed Jul 21 17:39:14 2010
@@ -62,6 +62,7 @@ import org.apache.pig.impl.util.JarManag
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.cmdline.CmdLineParser;
@@ -100,10 +101,10 @@ public class Main {
 public static void main(String args[]) {
     GenericOptionsParser parser = new GenericOptionsParser(args);
     String[] pigArgs = parser.getRemainingArgs();
-    System.exit(run(pigArgs));
+    System.exit(run(pigArgs, null));
 }
 
-static int run(String args[]) {
+static int run(String args[], PigProgressNotificationListener listener) {
     int rc = 1;
     Properties properties = new Properties();
     PropertiesUtil.loadDefaultProperties(properties);
@@ -287,6 +288,10 @@ static int run(String args[]) {
         // create the static script state object
         String commandLine = 
LoadFunc.join((AbstractList<String>)Arrays.asList(args), " ");
         ScriptState scriptState = ScriptState.start(commandLine);
+        if (listener != null) {
+            scriptState.registerListener(listener);
+        }
+        
 
         if(logFileName == null && !userSpecifiedLog) {
             logFileName = 
validateLogFile(properties.getProperty("pig.logfile"), null);

Modified: hadoop/pig/trunk/src/org/apache/pig/PigRunner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigRunner.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigRunner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigRunner.java Wed Jul 21 17:39:14 2010
@@ -20,6 +20,7 @@ package org.apache.pig;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 
@@ -45,10 +46,10 @@ public abstract class PigRunner {
         public final static int THROWABLE_EXCEPTION = 8;
     }
     
-    public static PigStats run(String[] args) {
+    public static PigStats run(String[] args, PigProgressNotificationListener 
listener) {
         GenericOptionsParser parser = new GenericOptionsParser(args);
         String[] pigArgs = parser.getRemainingArgs();
-        return PigStatsUtil.getPigStats(Main.run(pigArgs));
+        return PigStatsUtil.getPigStats(Main.run(pigArgs, listener));
     }
     
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Wed Jul 21 17:39:14 2010
@@ -63,8 +63,10 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.util.ConfigurationValidator;
 import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.ScriptState;
 
 
 /**
@@ -142,6 +144,8 @@ public class MapReduceLauncher extends L
             List<Job> jobsWithoutIds = jc.getWaitingJobs();
             log.info(jobsWithoutIds.size() +" map-reduce job(s) waiting for 
submission.");
             
+            
ScriptState.get().emitJobsSubmittedNotification(jobsWithoutIds.size());
+            
             String jobTrackerAdd;
             String port;
             String jobTrackerLoc;
@@ -184,6 +188,9 @@ public class MapReduceLauncher extends L
                                        log.info("More information at: 
http://"+ jobTrackerLoc+
                                                        
"/jobdetails.jsp?jobid="+job.getAssignedJobID());
                                }  
+                               
+                               ScriptState.get().emitJobStartedNotification(
+                                job.getAssignedJobID().toString());            
            
                        }
                        else{
                                // This job is not assigned an id yet.
@@ -194,8 +201,11 @@ public class MapReduceLauncher extends L
                double prog = (numMRJobsCompl+calculateProgress(jc, 
jobClient))/totalMRJobs;
                if(prog>=(lastProg+0.01)){
                        int perCom = (int)(prog * 100);
-                       if(perCom!=100)
+                       if(perCom!=100) {
                                log.info( perCom + "% complete");
+                               
+                               
ScriptState.get().emitProgressUpdatedNotification(perCom);
+                       }
                }
                lastProg = prog;
             }
@@ -266,6 +276,8 @@ public class MapReduceLauncher extends L
             jc.stop(); 
         }
 
+        ScriptState.get().emitProgressUpdatedNotification(100);
+        
         log.info( "100% complete");
              
         boolean failed = false;

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Wed Jul 21 
17:39:14 2010
@@ -422,6 +422,10 @@ public final class JobStats extends Oper
                 ds.setPOStore(sto);
                 ds.setConf(conf);
                 outputs.add(ds);
+                
+                if (state == JobState.SUCCESS) {
+                    ScriptState.get().emitOutputCompletedNotification(ds);
+                }
             }
         } else {
             for (POStore sto : mapStores) {
@@ -472,6 +476,10 @@ public final class JobStats extends Oper
         ds.setPOStore(sto);
         ds.setConf(conf);
         outputs.add(ds);
+        
+        if (state == JobState.SUCCESS) {
+            ScriptState.get().emitOutputCompletedNotification(ds);
+        }
     }
        
     void addInputStatistics() {

Added: 
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java?rev=966326&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
 Wed Jul 21 17:39:14 2010
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+...@interfaceaudience.public
+...@interfacestability.evolving
+public interface PigProgressNotificationListener extends 
java.util.EventListener {
+    
+    public void launchStartedNotification(int numJobsToLaunch);
+    
+    public void jobsSubmittedNotification(int numJobsSubmitted);
+    
+    public void jobStartedNotification(String assignedJobId);
+    
+    public void jobFinishedNotification(JobStats jobStats);
+    
+    public void jobFailedNotification(JobStats jobStats);
+    
+    public void outputCompletedNotification(OutputStats outputStats);
+    
+    public void progressUpdatedNotification(int progress);
+    
+    public void launchCompletedNotification(int numJobsSucceeded);
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Wed 
Jul 21 17:39:14 2010
@@ -166,6 +166,8 @@ public abstract class PigStatsUtil {
             JobControlCompiler jcc, MROperPlan plan) {
         PigStats ps = PigStats.start();
         ps.start(pc, client, jcc, plan);
+        
+        ScriptState.get().emitLaunchStartedNotification(plan.size());
     }
      
     /**
@@ -184,6 +186,8 @@ public abstract class PigStatsUtil {
                 LOG.error("Error message: " + errMsg);
             }            
         }
+        ScriptState.get().emitLaunchCompletedNotification(
+                ps.getNumberSuccessfulJobs());
         if (display) ps.display();
     }
     
@@ -238,16 +242,20 @@ public abstract class PigStatsUtil {
      */
     public static void accumulateStats(JobControl jc) {
         PigStats ps = PigStats.get();
-  
+        ScriptState ss = ScriptState.get();
+        
         for (Job job : jc.getSuccessfulJobs()) {            
-            accumulateSuccessStatistics(ps, job);
+            JobStats js = accumulateSuccessStatistics(ps, job);
+            if (js != null) {
+                ss.emitjobFinishedNotification(js);
+            }
         }
         
         for (Job job : jc.getFailedJobs()) {                      
-            JobStats js = 
-                addFailedJobStats(ps, job);
+            JobStats js = addFailedJobStats(ps, job);
             if (js != null) {
-                js.setErrorMsg(job.getMessage());                    
+                js.setErrorMsg(job.getMessage());    
+                ss.emitJobFailedNotification(js);
             } else {
                 LOG.warn("unable to add failed job stats: " + job);
             }
@@ -285,7 +293,7 @@ public abstract class PigStatsUtil {
         return js;
     }
     
-    private static void accumulateSuccessStatistics(PigStats ps, Job job) {
+    private static JobStats accumulateSuccessStatistics(PigStats ps, Job job) {
         JobStats js = ps.addJobStats(job);
         if (js == null) {
             LOG.warn("unable to add job stats");
@@ -312,6 +320,7 @@ public abstract class PigStatsUtil {
             
             js.addInputStatistics();
         }
+        return js;
     }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Wed Jul 
21 17:39:14 2010
@@ -177,6 +177,9 @@ public class ScriptState {
     private Map<MapReduceOper, String> featureMap = null;
     private Map<MapReduceOper, String> aliasMap = null;
     
+    private List<PigProgressNotificationListener> listeners
+            = new ArrayList<PigProgressNotificationListener>();
+    
     public static ScriptState start(String commandLine) {
         ScriptState ss = new ScriptState(UUID.randomUUID().toString());
         ss.setCommandLine(commandLine);
@@ -196,6 +199,58 @@ public class ScriptState {
         return tss.get();
     }           
        
+    public void registerListener(PigProgressNotificationListener listener) {
+        listeners.add(listener);
+    }
+        
+    public void emitLaunchStartedNotification(int numJobsToLaunch) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.launchStartedNotification(numJobsToLaunch);
+        }
+    }
+    
+    public void emitJobsSubmittedNotification(int numJobsSubmitted) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.jobsSubmittedNotification(numJobsSubmitted);
+        }        
+    }
+    
+    public void emitJobStartedNotification(String assignedJobId) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.jobStartedNotification(assignedJobId);
+        }
+    }
+    
+    public void emitjobFinishedNotification(JobStats jobStats) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.jobFinishedNotification(jobStats);
+        }
+    }
+    
+    public void emitJobFailedNotification(JobStats jobStats) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.jobFailedNotification(jobStats);
+        }
+    }
+    
+    public void emitOutputCompletedNotification(OutputStats outputStats) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.outputCompletedNotification(outputStats);
+        }
+    }
+    
+    public void emitProgressUpdatedNotification(int progress) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.progressUpdatedNotification(progress);
+        }
+    }
+    
+    public void emitLaunchCompletedNotification(int numJobsSucceeded) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.launchCompletedNotification(numJobsSucceeded);
+        }
+    }
+    
     public void addSettingsToConf(MapReduceOper mro, Configuration conf) {
         LOG.info("Pig script settings are added to the job");
         conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=966326&r1=966325&r2=966326&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Jul 21 
17:39:14 2010
@@ -35,6 +35,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.junit.After;
@@ -81,7 +82,7 @@ public class TestPigRunner {
         
         try {
             String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args);
+            PigStats stats = PigRunner.run(args, new 
TestNotificationListener());
      
             assertTrue(stats.isSuccessful());
             
@@ -109,7 +110,7 @@ public class TestPigRunner {
         w.close();
         String[] args = { PIG_FILE };
         try {
-            PigStats stats = PigRunner.run(args);
+            PigStats stats = PigRunner.run(args, new 
TestNotificationListener());
             assertTrue(stats.isSuccessful());
             assertTrue(stats.getJobGraph().size() == 3);
             assertTrue(stats.getJobGraph().getSinks().size() == 1);
@@ -147,7 +148,7 @@ public class TestPigRunner {
         
         try {
             String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args);
+            PigStats stats = PigRunner.run(args, new 
TestNotificationListener());
             assertTrue(stats.isSuccessful());
             assertTrue(stats.getJobGraph().size() == 1);
             assertEquals(5, stats.getRecordWritten());
@@ -186,7 +187,7 @@ public class TestPigRunner {
         
         try {
             String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args);
+            PigStats stats = PigRunner.run(args, new 
TestNotificationListener());
             assertTrue(stats.isSuccessful());
             assertTrue(stats.getJobGraph().size() == 1);
             assertEquals(4, stats.getRecordWritten());           
@@ -218,7 +219,7 @@ public class TestPigRunner {
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
         String[] args = { "-c", PIG_FILE };
-        PigStats stats = PigRunner.run(args);
+        PigStats stats = PigRunner.run(args, null);
         assertTrue(stats.getReturnCode() == ReturnCode.PIG_EXCEPTION);
         assertTrue(stats.getErrorCode() == 1000);
         assertEquals("Error during parsing. Invalid alias: a in {a0: int,a1: 
int,a2: int}", 
@@ -228,14 +229,14 @@ public class TestPigRunner {
     @Test
     public void simpleNegativeTest2() throws Exception {
         String[] args = { "-c", "-e", "this is a test" };
-        PigStats stats = PigRunner.run(args);        
+        PigStats stats = PigRunner.run(args, new TestNotificationListener());  
      
         assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS);
     }
 
     @Test
     public void simpleNegativeTest3() throws Exception {
         String[] args = { "-c", "-y" };
-        PigStats stats = PigRunner.run(args);     
+        PigStats stats = PigRunner.run(args, new TestNotificationListener());  
   
         assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION);
         assertEquals("Found unknown option (-y) at position 2", 
                 stats.getErrorMessage());
@@ -257,7 +258,7 @@ public class TestPigRunner {
         
         try {
             String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args);             
+            PigStats stats = PigRunner.run(args, null);             
             assertTrue(!stats.isSuccessful());            
             assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
             assertTrue(stats.getJobGraph().size() == 2);
@@ -296,4 +297,62 @@ public class TestPigRunner {
         String name = PigStatsUtil.getMultiInputsCounterName(s);
         assertEquals(PigStatsUtil.MULTI_INPUTS_RECORD_COUNTER + "batchtest", 
name);
     }
+    
+    private static class TestNotificationListener implements 
PigProgressNotificationListener {
+        
+        private int numJobsToLaunch = 0;
+        private int numJobsSubmitted = 0;
+        private int numJobStarted = 0;
+        private int numJobFinished = 0;
+        
+        @Override
+        public void launchStartedNotification(int numJobsToLaunch) {
+            System.out.println("++++ numJobsToLaunch: " + numJobsToLaunch);  
+            this.numJobsToLaunch = numJobsToLaunch;
+        }
+
+        @Override
+        public void jobFailedNotification(JobStats jobStats) {
+            System.out.println("++++ job failed: " + jobStats.getJobId());     
      
+        }
+
+        @Override
+        public void jobFinishedNotification(JobStats jobStats) {
+            System.out.println("++++ job finished: " + jobStats.getJobId());  
+            numJobFinished++;            
+        }
+
+        @Override
+        public void jobStartedNotification(String assignedJobId) {
+            System.out.println("++++ job started: " + assignedJobId);   
+            numJobStarted++;
+        }
+
+        @Override
+        public void jobsSubmittedNotification(int numJobsSubmitted) {
+            System.out.println("++++ jobs submitted: " + numJobsSubmitted);
+            this.numJobsSubmitted += numJobsSubmitted;
+        }
+
+        @Override
+        public void launchCompletedNotification(int numJobsSucceeded) {
+            System.out.println("++++ numJobsSucceeded: " + numJobsSucceeded);  
 
+            System.out.println("");
+            assertEquals(this.numJobsToLaunch, numJobsSucceeded);
+            assertEquals(this.numJobsSubmitted, numJobsSucceeded);
+            assertEquals(this.numJobStarted, numJobsSucceeded);
+            assertEquals(this.numJobFinished, numJobsSucceeded);
+        }
+
+        @Override
+        public void outputCompletedNotification(OutputStats outputStats) {
+            System.out.println("++++ output done: " + 
outputStats.getLocation());
+        }
+
+        @Override
+        public void progressUpdatedNotification(int progress) {
+            System.out.println("++++ progress: " + progress + "%");           
+        }
+        
+    }
 }


Reply via email to