Author: rding
Date: Fri May 14 18:01:44 2010
New Revision: 944363
URL: http://svn.apache.org/viewvc?rev=944363&view=rev
Log:
PIG-1280: Add a pig-script-id to the JobConf of all jobs run in a pig-script
Added:
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
Modified:
hadoop/pig/trunk/conf/pig-default.properties
hadoop/pig/trunk/src/org/apache/pig/Main.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/trunk/conf/pig-default.properties
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/conf/pig-default.properties?rev=944363&r1=944362&r2=944363&view=diff
==============================================================================
--- hadoop/pig/trunk/conf/pig-default.properties (original)
+++ hadoop/pig/trunk/conf/pig-default.properties Fri May 14 18:01:44 2010
@@ -13,6 +13,9 @@ verbose=false
#exectype local|mapreduce, mapreduce is default
exectype=mapreduce
+#Enable insertion of information about script into hadoop job conf
+pig.script.info.enabled=true
+
#Do not spill temp files smaller than this size (bytes)
pig.spill.size.threshold=5000000
#EXPERIMENT: Activate garbage collection when spilling a file bigger than this
size (bytes)
Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=944363&r1=944362&r2=944363&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Fri May 14 18:01:44 2010
@@ -27,7 +27,9 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.io.StringWriter;
+import java.util.AbstractList;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
@@ -58,6 +60,7 @@ import org.apache.pig.impl.util.JarManag
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.cmdline.CmdLineParser;
import org.apache.pig.tools.grunt.Grunt;
import org.apache.pig.impl.util.LogUtils;
@@ -271,10 +274,14 @@ public static void main(String args[])
}
// create the context with the parameter
PigContext pigContext = new PigContext(execType, properties);
+
+ // create the static script state object
+ String commandLine =
LoadFunc.join((AbstractList<String>)Arrays.asList(args), " ");
+ ScriptState scriptState = ScriptState.start(commandLine);
if(logFileName == null && !userSpecifiedLog) {
- logFileName =
validateLogFile(properties.getProperty("pig.logfile"), null);
- }
+ logFileName =
validateLogFile(properties.getProperty("pig.logfile"), null);
+ }
pigContext.getProperties().setProperty("pig.logfile", (logFileName ==
null? "": logFileName));
@@ -323,6 +330,8 @@ public static void main(String args[])
new File(substFile).deleteOnExit();
}
+ scriptState.setScript(new File(file));
+
grunt = new Grunt(pin, pigContext);
gruntCalled = true;
@@ -354,6 +363,9 @@ public static void main(String args[])
if (i != 0) sb.append(' ');
sb.append(remainders[i]);
}
+
+ scriptState.setScript(sb.toString());
+
in = new BufferedReader(new StringReader(sb.toString()));
grunt = new Grunt(in, pigContext);
gruntCalled = true;
@@ -421,6 +433,8 @@ public static void main(String args[])
"PigLatin:" +new
File(remainders[0]).getName()
);
+ scriptState.setScript(new File(remainders[0]));
+
grunt = new Grunt(pin, pigContext);
gruntCalled = true;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=944363&r1=944362&r2=944363&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Fri May 14 18:01:44 2010
@@ -46,7 +46,6 @@ import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
-import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
@@ -80,13 +79,14 @@ import org.apache.pig.impl.io.NullablePa
import org.apache.pig.impl.io.NullableText;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.ScriptState;
/**
@@ -327,6 +327,13 @@ public class JobControlCompiler{
ArrayList<POStore> storeLocations = new ArrayList<POStore>();
Path tmpLocation = null;
+ // add settings for pig statistics
+ String setScriptProp = conf.get(ScriptState.INSERT_ENABLED, "true");
+ if (setScriptProp.equalsIgnoreCase("true")) {
+ ScriptState ss = ScriptState.get();
+ ss.addSettingsToConf(mro, conf);
+ }
+
//Set the User Name for this job. This will be
//used as the working directory
String user = System.getProperty("user.name");
@@ -613,7 +620,7 @@ public class JobControlCompiler{
conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
conf.set(PIG_REDUCE_STORES,
ObjectSerializer.serialize(reduceStores));
-
+
// Serialize the UDF specific context info.
UDFContext.getUDFContext().serialize(conf);
Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new
ArrayList());
Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=944363&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Fri May
14 18:01:44 2010
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.jar.Attributes;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.pig.LoadFunc;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.JarManager;
+
+/**
+ * ScriptStates encapsulates settings for a Pig script that runs on a hadoop
+ * cluster. These settings are added to all MR jobs spawned by the script and
+ * in turn are persisted in the hadoop job xml. With the properties already in
+ * the job xml, users who want to know the relations between the script and MR
+ * jobs can derive them from the job xmls.
+ */
+public class ScriptState {
+
+ /**
+ * Keys of Pig settings added in MR job
+ */
+ private enum PIG_PROPERTY {
+ SCRIPT_ID ("pig.script.id"),
+ SCRIPT ("pig.script"),
+ LAUNCHER_HOST ("pig.launcher.host"),
+ COMMAND_LINE ("pig.command.line"),
+ HADOOP_VERSION ("pig.hadoop.version"),
+ VERSION ("pig.version"),
+ INPUT_DIRS ("pig.input.dirs"),
+ MAP_OUTPUT_DIRS ("pig.map.output.dirs"),
+ REDUCE_OUTPUT_DIRS ("pig.reduce.output.dirs"),
+ FEATURE ("pig.feature");
+
+ private String displayStr;
+
+ private PIG_PROPERTY(String s) {
+ displayStr = s;
+ }
+
+ @Override
+ public String toString() { return displayStr; }
+ };
+
+ /**
+ * Features used in a Pig script
+ */
+ private enum PIG_FEATURE {
+ MERGE_JION,
+ REPLICATED_JOIN,
+ SKEWED_JION,
+ COLLECTED_GROUP,
+ MERGE_COGROUP,
+ ORDER_BY,
+ DISTINCT,
+ STREAMING,
+ MAP_ONLY;
+ };
+
+ /**
+ * Pig property that allows user to turn off the inclusion of settings
+ * in the jobs
+ */
+ public static final String INSERT_ENABLED = "pig.script.info.enabled";
+
+ private static final Log LOG = LogFactory.getLog(ScriptState.class);
+
+ private static ThreadLocal<ScriptState> tss = new
ThreadLocal<ScriptState>();
+
+ private String id;
+
+ private String script;
+ private String commandLine;
+ private String feature;
+
+ private String host;
+ private String pigVersion;
+ private String hodoopVersion;
+
+ public static ScriptState start(String commandLine) {
+ ScriptState ss = new ScriptState(UUID.randomUUID().toString());
+ ss.setCommandLine(commandLine);
+ tss.set(ss);
+ return ss;
+ }
+
+ private ScriptState(String id) {
+ this.id = id;
+ this.script = "";
+ }
+
+ public static ScriptState get() {
+ if (tss.get() == null) {
+ ScriptState.start("");
+ }
+ return tss.get();
+ }
+
+ public void addSettingsToConf(MapReduceOper mro, Configuration conf) {
+ LOG.info("Pig script settings is added to the job");
+ conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
+ conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
+ conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
+ conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
+ conf.set(PIG_PROPERTY.LAUNCHER_HOST.toString(), getHostName());
+ conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
+
+ try {
+ LinkedList<POStore> stores = PlanHelper.getStores(mro.mapPlan);
+ ArrayList<String> outputDirs = new ArrayList<String>();
+ for (POStore st: stores) {
+ outputDirs.add(st.getSFile().getFileName());
+ }
+ conf.set(PIG_PROPERTY.MAP_OUTPUT_DIRS.toString(),
LoadFunc.join(outputDirs, ","));
+ } catch (VisitorException e) {
+ LOG.warn("unable to get the map stores", e);
+ }
+ if (!mro.reducePlan.isEmpty()) {
+ try {
+ LinkedList<POStore> stores =
PlanHelper.getStores(mro.reducePlan);
+ ArrayList<String> outputDirs = new ArrayList<String>();
+ for (POStore st: stores) {
+ outputDirs.add(st.getSFile().getFileName());
+ }
+ conf.set(PIG_PROPERTY.REDUCE_OUTPUT_DIRS.toString(),
LoadFunc.join(outputDirs, ","));
+ } catch (VisitorException e) {
+ LOG.warn("unable to get the reduce stores", e);
+ }
+ }
+ try {
+ List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+ ArrayList<String> inputDirs = new ArrayList<String>();
+ if (lds != null && lds.size() > 0){
+ for (POLoad ld : lds) {
+ inputDirs.add(ld.getLFile().getFileName());
+ }
+ conf.set(PIG_PROPERTY.INPUT_DIRS.toString(),
LoadFunc.join(inputDirs, ","));
+ }
+ } catch (VisitorException e) {
+ LOG.warn("unable to get the map loads", e);
+ }
+
+ setPigFeature(mro, conf);
+ }
+
+ public void setScript(File file) {
+ try {
+ setScript(new BufferedReader(new FileReader(file)));
+ } catch (FileNotFoundException e) {
+ LOG.warn("unable to find the file", e);
+ }
+ }
+
+ public void setScript(String script) {
+ this.script = script;
+ }
+
+ private String getScript() {
+ return (script == null) ? "" : script;
+ }
+
+ private String getHadoopVersion() {
+ if (hodoopVersion == null) {
+ hodoopVersion = VersionInfo.getVersion();
+ }
+ return (hodoopVersion == null) ? "" : hodoopVersion;
+ }
+
+ private String getPigVersion() {
+ if (pigVersion == null) {
+ String findContainingJar =
JarManager.findContainingJar(ScriptState.class);
+ try {
+ JarFile jar = new JarFile(findContainingJar);
+ final Manifest manifest = jar.getManifest();
+ final Map <String,Attributes> attrs = manifest.getEntries();
+ Attributes attr = attrs.get("org/apache/pig");
+ pigVersion = attr.getValue("Implementation-Version");
+ } catch (Exception e) {
+ LOG.warn("unable to read pigs manifest file", e);
+ }
+ }
+ return (pigVersion == null) ? "" : pigVersion;
+ }
+
+ private String getHostName() {
+ if (host == null) {
+ try {
+ InetAddress addr = InetAddress.getLocalHost();
+ host = addr.getHostName();
+ } catch (UnknownHostException e) {
+ LOG.warn("unable to get host name", e);
+ }
+ }
+ return (host == null) ? "" : host;
+ }
+
+ private String getCommandLine() {
+ return (commandLine == null) ? "" : commandLine;
+ }
+
+ private void setCommandLine(String commandLine) {
+ this.commandLine = commandLine;
+ }
+
+ private void setScript(BufferedReader reader) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ String line = reader.readLine();
+ while (line != null) {
+ line = line.trim();
+ if (line.length() > 0 && !line.startsWith("--")) {
+ sb.append(line);
+ }
+ line = reader.readLine();
+ }
+ } catch (IOException e) {
+ LOG.warn("unable to parse the script", e);
+ }
+ this.script = sb.toString();
+ }
+
+ private void setPigFeature(MapReduceOper mro, Configuration conf) {
+ feature = "";
+ if (mro.isSkewedJoin()) {
+ feature = PIG_FEATURE.SKEWED_JION.toString();
+ } else if (mro.isGlobalSort()) {
+ feature = PIG_FEATURE.ORDER_BY.toString();
+ } else {
+ try {
+ new FeatureVisitor(mro.mapPlan).visit();
+ if (mro.reducePlan.isEmpty()) {
+ feature = feature.isEmpty() ?
+ PIG_FEATURE.MAP_ONLY.toString() : feature;
+ } else {
+ new FeatureVisitor(mro.reducePlan).visit();
+ }
+ } catch (VisitorException e) {
+ LOG.warn("Feature visitor failed", e);
+ }
+ }
+ conf.set(PIG_PROPERTY.FEATURE.toString(), feature);
+ }
+
+ private class FeatureVisitor extends PhyPlanVisitor {
+
+ public FeatureVisitor(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+ plan));
+ }
+
+ @Override
+ public void visitFRJoin(POFRJoin join) throws VisitorException {
+ feature = PIG_FEATURE.REPLICATED_JOIN.toString();
+ }
+
+ @Override
+ public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+ feature = PIG_FEATURE.MERGE_JION.toString();
+ }
+
+ @Override
+ public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+ throws VisitorException {
+ feature = PIG_FEATURE.MERGE_COGROUP.toString();
+ }
+
+ @Override
+ public void visitCollectedGroup(POCollectedGroup mg)
+ throws VisitorException {
+ feature = PIG_FEATURE.COLLECTED_GROUP.toString();
+ }
+
+ @Override
+ public void visitDistinct(PODistinct distinct) throws VisitorException
{
+ feature = PIG_FEATURE.DISTINCT.toString();
+ }
+
+ @Override
+ public void visitStream(POStream stream) throws VisitorException {
+ feature = PIG_FEATURE.STREAMING.toString();
+ }
+ }
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=944363&r1=944362&r2=944363&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Fri May 14
18:01:44 2010
@@ -2740,10 +2740,10 @@ public class TestMultiQuery {
Util.createInputFile(cluster, "input.txt", new String[] {"hello",
"bye"});
String query = "a = load 'input.txt';" +
"b = filter a by $0 < 10;" +
- "store b into 'output1' using
"+DummyStoreWithOutputFormat.class.getName()+"();" +
+ "store b into 'output1' using
"+DUMMY_STORE_WITH_OUTPUTFORMAT_CLASS+"();" +
"c = group a by $0;" +
"d = foreach c generate group, COUNT(a.$0);" +
- "store d into 'output2' using
"+DummyStoreWithOutputFormat.class.getName()+"();" ;
+ "store d into 'output2' using
"+DUMMY_STORE_WITH_OUTPUTFORMAT_CLASS+"();" ;
myPig.setBatchOn();
Util.registerMultiLineQuery(myPig, query);
myPig.executeBatch();
@@ -2773,50 +2773,31 @@ public class TestMultiQuery {
Assert.fail();
}
}
+
+ private static final String DUMMY_STORE_WITH_OUTPUTFORMAT_CLASS
+ =
"org.apache.pig.test.TestMultiQuery\\$DummyStoreWithOutputFormat";
public static class DummyStoreWithOutputFormat extends StoreFunc {
-
- /**
- *
- */
+
public DummyStoreWithOutputFormat() {
- // TODO Auto-generated constructor stub
}
-
- /* (non-Javadoc)
- * @see org.apache.pig.StoreFunc#putNext(org.apache.pig.data.Tuple)
- */
@Override
public void putNext(Tuple f) throws IOException {
- // TODO Auto-generated method stub
-
+
}
-
- /* (non-Javadoc)
- * @see
org.apache.pig.StoreFunc#checkSchema(org.apache.pig.ResourceSchema)
- */
@Override
public void checkSchema(ResourceSchema s) throws IOException {
- // TODO Auto-generated method stub
-
+
}
-
- /* (non-Javadoc)
- * @see org.apache.pig.StoreFunc#getOutputFormat()
- */
@Override
public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat()
throws IOException {
return new DummyOutputFormat();
}
-
- /* (non-Javadoc)
- * @see
org.apache.pig.StoreFunc#prepareToWrite(org.apache.hadoop.mapreduce.RecordWriter)
- */
@Override
public void prepareToWrite(
org.apache.hadoop.mapreduce.RecordWriter writer)
@@ -2824,20 +2805,12 @@ public class TestMultiQuery {
}
-
- /* (non-Javadoc)
- * @see
org.apache.pig.StoreFunc#relToAbsPathForStoreLocation(java.lang.String,
org.apache.hadoop.fs.Path)
- */
@Override
public String relToAbsPathForStoreLocation(String location, Path
curDir)
throws IOException {
return LoadFunc.getAbsolutePath(location, curDir);
}
-
- /* (non-Javadoc)
- * @see org.apache.pig.StoreFunc#setStoreLocation(java.lang.String,
org.apache.hadoop.mapreduce.Job)
- */
@Override
public void setStoreLocation(String location, Job job)
throws IOException {
@@ -2852,7 +2825,7 @@ public class TestMultiQuery {
}
- @SuppressWarnings({ "deprecation", "unchecked" })
+ @SuppressWarnings({ "unchecked" })
public static class DummyOutputFormat
extends OutputFormat<WritableComparable, Tuple> {