Author: rding
Date: Fri May 14 18:01:44 2010
New Revision: 944363

URL: http://svn.apache.org/viewvc?rev=944363&view=rev
Log:
PIG-1280: Add a pig-script-id to the JobConf of all jobs run in a pig-script

Added:
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
Modified:
    hadoop/pig/trunk/conf/pig-default.properties
    hadoop/pig/trunk/src/org/apache/pig/Main.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/trunk/conf/pig-default.properties
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/conf/pig-default.properties?rev=944363&r1=944362&r2=944363&view=diff
==============================================================================
--- hadoop/pig/trunk/conf/pig-default.properties (original)
+++ hadoop/pig/trunk/conf/pig-default.properties Fri May 14 18:01:44 2010
@@ -13,6 +13,9 @@ verbose=false
 #exectype local|mapreduce, mapreduce is default
 exectype=mapreduce
 
+#Enable insertion of information about script into hadoop job conf 
+pig.script.info.enabled=true
+
 #Do not spill temp files smaller than this size (bytes)
 pig.spill.size.threshold=5000000
 #EXPERIMENT: Activate garbage collection when spilling a file bigger than this 
size (bytes)

Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=944363&r1=944362&r2=944363&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Fri May 14 18:01:44 2010
@@ -27,7 +27,9 @@ import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.util.AbstractList;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Map;
@@ -58,6 +60,7 @@ import org.apache.pig.impl.util.JarManag
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.cmdline.CmdLineParser;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.impl.util.LogUtils;
@@ -271,10 +274,14 @@ public static void main(String args[])
         }
         // create the context with the parameter
         PigContext pigContext = new PigContext(execType, properties);
+        
+        // create the static script state object
+        String commandLine = 
LoadFunc.join((AbstractList<String>)Arrays.asList(args), " ");
+        ScriptState scriptState = ScriptState.start(commandLine);
 
         if(logFileName == null && !userSpecifiedLog) {
-           logFileName = 
validateLogFile(properties.getProperty("pig.logfile"), null);
-       }
+            logFileName = 
validateLogFile(properties.getProperty("pig.logfile"), null);
+        }
         
         pigContext.getProperties().setProperty("pig.logfile", (logFileName == 
null? "": logFileName));
      
@@ -323,6 +330,8 @@ public static void main(String args[])
                 new File(substFile).deleteOnExit();
             }
             
+            scriptState.setScript(new File(file));
+            
             grunt = new Grunt(pin, pigContext);
             gruntCalled = true;
             
@@ -354,6 +363,9 @@ public static void main(String args[])
                 if (i != 0) sb.append(' ');
                 sb.append(remainders[i]);
             }
+            
+            scriptState.setScript(sb.toString());
+            
             in = new BufferedReader(new StringReader(sb.toString()));
             grunt = new Grunt(in, pigContext);
             gruntCalled = true;
@@ -421,6 +433,8 @@ public static void main(String args[])
                                                    "PigLatin:" +new 
File(remainders[0]).getName()
             );
 
+            scriptState.setScript(new File(remainders[0]));
+            
             grunt = new Grunt(pin, pigContext);
             gruntCalled = true;
             

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=944363&r1=944362&r2=944363&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Fri May 14 18:01:44 2010
@@ -46,7 +46,6 @@ import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
-import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
@@ -80,13 +79,14 @@ import org.apache.pig.impl.io.NullablePa
 import org.apache.pig.impl.io.NullableText;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.ScriptState;
 
 
 /**
@@ -327,6 +327,13 @@ public class JobControlCompiler{
         ArrayList<POStore> storeLocations = new ArrayList<POStore>();
         Path tmpLocation = null;
         
+        // add settings for pig statistics
+        String setScriptProp = conf.get(ScriptState.INSERT_ENABLED, "true");
+        if (setScriptProp.equalsIgnoreCase("true")) {
+            ScriptState ss = ScriptState.get();
+            ss.addSettingsToConf(mro, conf);
+        }
+        
         //Set the User Name for this job. This will be
         //used as the working directory
         String user = System.getProperty("user.name");        
@@ -613,7 +620,7 @@ public class JobControlCompiler{
 
             conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
             conf.set(PIG_REDUCE_STORES, 
ObjectSerializer.serialize(reduceStores));
-      
+                        
             // Serialize the UDF specific context info.
             UDFContext.getUDFContext().serialize(conf);
             Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new 
ArrayList());

Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=944363&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Fri May 
14 18:01:44 2010
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.jar.Attributes;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.pig.LoadFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.JarManager;
+
+/**
+ * ScriptStates encapsulates settings for a Pig script that runs on a hadoop 
+ * cluster. These settings are added to all MR jobs spawned by the script and 
+ * in turn are persisted in the hadoop job xml. With the properties already in 
+ * the job xml, users who want to know the relations between the script and MR
+ * jobs can derive them from the job xmls.  
+ */
+public class ScriptState {
+    
+    /**
+     * Keys of Pig settings added in MR job
+     */
+    private enum PIG_PROPERTY {
+        SCRIPT_ID           ("pig.script.id"),
+        SCRIPT              ("pig.script"),
+        LAUNCHER_HOST       ("pig.launcher.host"),
+        COMMAND_LINE        ("pig.command.line"),
+        HADOOP_VERSION      ("pig.hadoop.version"),
+        VERSION             ("pig.version"),
+        INPUT_DIRS          ("pig.input.dirs"),
+        MAP_OUTPUT_DIRS     ("pig.map.output.dirs"),
+        REDUCE_OUTPUT_DIRS  ("pig.reduce.output.dirs"),
+        FEATURE             ("pig.feature");
+       
+        private String displayStr;
+        
+        private PIG_PROPERTY(String s) {
+            displayStr = s;
+        }
+        
+        @Override
+        public String toString() { return displayStr; }
+    };
+    
+    /**
+     * Features used in a Pig script
+     */
+    private enum PIG_FEATURE {
+        MERGE_JION,
+        REPLICATED_JOIN,
+        SKEWED_JION,
+        COLLECTED_GROUP,
+        MERGE_COGROUP,
+        ORDER_BY,
+        DISTINCT,
+        STREAMING,
+        MAP_ONLY;
+    };
+    
+    /**
+     * Pig property that allows user to turn off the inclusion of settings
+     * in the jobs 
+     */
+    public static final String INSERT_ENABLED = "pig.script.info.enabled";
+    
+    private static final Log LOG = LogFactory.getLog(ScriptState.class);
+
+    private static ThreadLocal<ScriptState> tss = new 
ThreadLocal<ScriptState>();
+       
+    private String id;
+    
+    private String script;
+    private String commandLine;
+    private String feature;
+    
+    private String host;
+    private String pigVersion;
+    private String hodoopVersion;
+           
+    public static ScriptState start(String commandLine) {
+        ScriptState ss = new ScriptState(UUID.randomUUID().toString());
+        ss.setCommandLine(commandLine);
+        tss.set(ss);
+        return ss;
+    }
+    
+    private ScriptState(String id) {
+        this.id = id;
+        this.script = ""; 
+    }
+
+    public static ScriptState get() {
+        if (tss.get() == null) {
+            ScriptState.start("");
+        }
+        return tss.get();
+    }
+           
+    public void addSettingsToConf(MapReduceOper mro, Configuration conf) {
+        LOG.info("Pig script settings is added to the job");
+        conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
+        conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
+        conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
+        conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
+        conf.set(PIG_PROPERTY.LAUNCHER_HOST.toString(), getHostName());
+        conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
+        
+        try {
+            LinkedList<POStore> stores = PlanHelper.getStores(mro.mapPlan);
+            ArrayList<String> outputDirs = new ArrayList<String>();
+            for (POStore st: stores) {  
+                outputDirs.add(st.getSFile().getFileName()); 
+            }                 
+            conf.set(PIG_PROPERTY.MAP_OUTPUT_DIRS.toString(), 
LoadFunc.join(outputDirs, ","));
+        } catch (VisitorException e) {
+            LOG.warn("unable to get the map stores", e);
+        }
+        if (!mro.reducePlan.isEmpty()) {
+            try {
+                LinkedList<POStore> stores = 
PlanHelper.getStores(mro.reducePlan);
+                ArrayList<String> outputDirs = new ArrayList<String>();
+                for (POStore st: stores) {  
+                    outputDirs.add(st.getSFile().getFileName()); 
+                }                      
+                conf.set(PIG_PROPERTY.REDUCE_OUTPUT_DIRS.toString(), 
LoadFunc.join(outputDirs, ","));
+            } catch (VisitorException e) {
+                LOG.warn("unable to get the reduce stores", e);
+            }
+        }        
+        try {
+            List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+            ArrayList<String> inputDirs = new ArrayList<String>();
+            if (lds != null && lds.size() > 0){
+                for (POLoad ld : lds) {
+                    inputDirs.add(ld.getLFile().getFileName());
+                }               
+                conf.set(PIG_PROPERTY.INPUT_DIRS.toString(), 
LoadFunc.join(inputDirs, ","));       
+            }
+        } catch (VisitorException e) {
+            LOG.warn("unable to get the map loads", e);
+        }
+
+        setPigFeature(mro, conf);
+    }
+ 
+    public void setScript(File file) {            
+        try {
+            setScript(new BufferedReader(new FileReader(file)));
+        } catch (FileNotFoundException e) {
+            LOG.warn("unable to find the file", e);
+        }
+    }
+
+    public void setScript(String script) {            
+        this.script = script;
+    }
+
+    private String getScript() {
+        return (script == null) ? "" : script;
+    }
+    
+    private String getHadoopVersion() {
+        if (hodoopVersion == null) {
+            hodoopVersion = VersionInfo.getVersion();
+        }
+        return (hodoopVersion == null) ? "" : hodoopVersion;
+    }
+    
+    private String getPigVersion() {
+        if (pigVersion == null) {
+            String findContainingJar = 
JarManager.findContainingJar(ScriptState.class);
+            try { 
+                JarFile jar = new JarFile(findContainingJar); 
+                final Manifest manifest = jar.getManifest(); 
+                final Map <String,Attributes> attrs = manifest.getEntries(); 
+                Attributes attr = attrs.get("org/apache/pig");
+                pigVersion = attr.getValue("Implementation-Version");
+            } catch (Exception e) { 
+                LOG.warn("unable to read pigs manifest file", e); 
+            } 
+        }
+        return (pigVersion == null) ? "" : pigVersion;
+    }
+    
+    private String getHostName() { 
+        if (host == null) {
+            try {
+                InetAddress addr = InetAddress.getLocalHost();
+                host = addr.getHostName(); 
+            } catch (UnknownHostException e) {
+                LOG.warn("unable to get host name", e); 
+            }         
+        }
+        return (host == null) ? "" : host;
+    }
+    
+    private String getCommandLine() {
+        return (commandLine == null) ? "" : commandLine;
+    }
+    
+    private void setCommandLine(String commandLine) {
+        this.commandLine = commandLine;
+    }
+    
+    private void setScript(BufferedReader reader) {
+        StringBuilder sb = new StringBuilder();
+        try {
+            String line = reader.readLine();
+            while (line != null) {
+                line = line.trim();
+                if (line.length() > 0 && !line.startsWith("--")) {
+                    sb.append(line);
+                }                
+                line = reader.readLine();
+            }            
+        } catch (IOException e) {
+            LOG.warn("unable to parse the script", e);
+        }
+        this.script = sb.toString();
+    }
+    
+    private void setPigFeature(MapReduceOper mro, Configuration conf) {
+        feature = "";
+        if (mro.isSkewedJoin()) {
+            feature = PIG_FEATURE.SKEWED_JION.toString();
+        } else if (mro.isGlobalSort()) {
+            feature = PIG_FEATURE.ORDER_BY.toString();
+        } else {
+            try {
+                new FeatureVisitor(mro.mapPlan).visit();
+                if (mro.reducePlan.isEmpty()) { 
+                    feature = feature.isEmpty() ? 
+                            PIG_FEATURE.MAP_ONLY.toString() : feature;         
           
+                } else {
+                    new FeatureVisitor(mro.reducePlan).visit();
+                }
+            } catch (VisitorException e) {
+                LOG.warn("Feature visitor failed", e);
+            }
+        }
+        conf.set(PIG_PROPERTY.FEATURE.toString(), feature);
+    }
+    
+    private class FeatureVisitor extends PhyPlanVisitor {
+        
+        public FeatureVisitor(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+        }
+        
+        @Override
+        public void visitFRJoin(POFRJoin join) throws VisitorException {
+            feature = PIG_FEATURE.REPLICATED_JOIN.toString();
+        }
+        
+        @Override
+        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+            feature = PIG_FEATURE.MERGE_JION.toString();
+        }
+        
+        @Override
+        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+                throws VisitorException {
+            feature = PIG_FEATURE.MERGE_COGROUP.toString();
+        }
+        
+        @Override
+        public void visitCollectedGroup(POCollectedGroup mg)
+                throws VisitorException {           
+            feature = PIG_FEATURE.COLLECTED_GROUP.toString();
+        }
+        
+        @Override
+        public void visitDistinct(PODistinct distinct) throws VisitorException 
{
+            feature = PIG_FEATURE.DISTINCT.toString();
+        }
+        
+        @Override
+        public void visitStream(POStream stream) throws VisitorException {
+            feature = PIG_FEATURE.STREAMING.toString();
+        }
+    }    
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=944363&r1=944362&r2=944363&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Fri May 14 
18:01:44 2010
@@ -2740,10 +2740,10 @@ public class TestMultiQuery {
         Util.createInputFile(cluster, "input.txt", new String[] {"hello", 
"bye"});
         String query = "a = load 'input.txt';" +
                        "b = filter a by $0 < 10;" +
-                       "store b into 'output1' using 
"+DummyStoreWithOutputFormat.class.getName()+"();" +
+                       "store b into 'output1' using 
"+DUMMY_STORE_WITH_OUTPUTFORMAT_CLASS+"();" +
                        "c = group a by $0;" +
                        "d = foreach c generate group, COUNT(a.$0);" +
-                       "store d into 'output2' using 
"+DummyStoreWithOutputFormat.class.getName()+"();" ;
+                       "store d into 'output2' using 
"+DUMMY_STORE_WITH_OUTPUTFORMAT_CLASS+"();" ;
         myPig.setBatchOn();
         Util.registerMultiLineQuery(myPig, query);
         myPig.executeBatch();
@@ -2773,50 +2773,31 @@ public class TestMultiQuery {
             Assert.fail();
         }
     }
+    
+    private static final String DUMMY_STORE_WITH_OUTPUTFORMAT_CLASS
+            = 
"org.apache.pig.test.TestMultiQuery\\$DummyStoreWithOutputFormat";
 
     public static class DummyStoreWithOutputFormat extends StoreFunc {
-        
-        /**
-         * 
-         */
+ 
         public DummyStoreWithOutputFormat() {
-            // TODO Auto-generated constructor stub
         }
 
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.StoreFunc#putNext(org.apache.pig.data.Tuple)
-         */
         @Override
         public void putNext(Tuple f) throws IOException {
-            // TODO Auto-generated method stub
-            
+ 
         }
 
-
-        /* (non-Javadoc)
-         * @see 
org.apache.pig.StoreFunc#checkSchema(org.apache.pig.ResourceSchema)
-         */
         @Override
         public void checkSchema(ResourceSchema s) throws IOException {
-            // TODO Auto-generated method stub
-            
+ 
         }
 
-
-        /* (non-Javadoc)
-         * @see org.apache.pig.StoreFunc#getOutputFormat()
-         */
         @Override
         public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat()
                 throws IOException {
             return new DummyOutputFormat();
         }
 
-
-        /* (non-Javadoc)
-         * @see 
org.apache.pig.StoreFunc#prepareToWrite(org.apache.hadoop.mapreduce.RecordWriter)
-         */
         @Override
         public void prepareToWrite(
                 org.apache.hadoop.mapreduce.RecordWriter writer)
@@ -2824,20 +2805,12 @@ public class TestMultiQuery {
             
         }
 
-
-        /* (non-Javadoc)
-         * @see 
org.apache.pig.StoreFunc#relToAbsPathForStoreLocation(java.lang.String, 
org.apache.hadoop.fs.Path)
-         */
         @Override
         public String relToAbsPathForStoreLocation(String location, Path 
curDir)
                 throws IOException {
             return LoadFunc.getAbsolutePath(location, curDir);
         }
 
-
-        /* (non-Javadoc)
-         * @see org.apache.pig.StoreFunc#setStoreLocation(java.lang.String, 
org.apache.hadoop.mapreduce.Job)
-         */
         @Override
         public void setStoreLocation(String location, Job job)
                 throws IOException {
@@ -2852,7 +2825,7 @@ public class TestMultiQuery {
                 
     }
     
-    @SuppressWarnings({ "deprecation", "unchecked" })
+    @SuppressWarnings({ "unchecked" })
     public static class DummyOutputFormat
     extends OutputFormat<WritableComparable, Tuple> {
 


Reply via email to