Author: pradeepkth Date: Tue Feb 16 20:36:40 2010 New Revision: 910677 URL: http://svn.apache.org/viewvc?rev=910677&view=rev Log: PIG-1239: PigContext.connect() should not create a jobClient and jobClient should be created on demand when needed (pradeepkth)
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=910677&r1=910676&r2=910677&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.6/CHANGES.txt Tue Feb 16 20:36:40 2010 @@ -151,6 +151,9 @@ BUG FIXES +PIG-1239: PigContext.connect() should not create a jobClient and jobClient +should be created on demand when needed (pradeepkth) + PIG-1213: Schema serialization is broken (pradeepkth) PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER, Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=910677&r1=910676&r2=910677&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Feb 16 20:36:40 2010 @@ -90,7 +90,8 @@ protected DataStorage ds; - protected JobClient jobClient; + @SuppressWarnings("deprecation") + protected JobConf jobConf; // key: the operator key from the logical plan that originated the physical plan // val: the operator key for the root of the phyisical plan @@ -110,11 +111,12 @@ this.ds = null; // to be set in the init method - this.jobClient = null; + this.jobConf = null; } - public JobClient getJobClient() { - return this.jobClient; + @SuppressWarnings("deprecation") + public JobConf getJobConf() { + return this.jobConf; } public Map<OperatorKey, MapRedResult> getMaterializedResults() { @@ -134,6 +136,7 @@ init(this.pigContext.getProperties()); } + @SuppressWarnings("deprecation") public void init(Properties properties) throws ExecException { //First set the ssh socket factory setSSHFactory(); @@ -155,13 +158,13 @@ // Now add the settings from "properties" object to override any existing properties // All of the above is accomplished in the method call below - JobConf jobConf = new JobConf(); - jobConf.addResource("pig-cluster-hadoop-site.xml"); + JobConf jc = new JobConf(); + jc.addResource("pig-cluster-hadoop-site.xml"); //the method below alters the properties object by overriding the //hadoop properties with the values from properties and recomputing //the properties - recomputeProperties(jobConf, properties); + recomputeProperties(jc, properties); configuration = ConfigurationUtil.toConfiguration(properties); properties = ConfigurationUtil.toProperties(configuration); @@ -193,15 +196,8 @@ log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION)); } - try { - // Set job-specific configuration knobs - jobClient = new JobClient(new JobConf(configuration)); - } - catch (IOException e) { - int errCode = 6009; - String msg = "Failed to create job client:" + e.getMessage(); - throw new ExecException(msg, errCode, PigException.BUG, e); - } + // Set job-specific configuration knobs + jobConf = new JobConf(configuration); } public Properties getConfiguration() throws ExecException { Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=910677&r1=910676&r2=910677&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Feb 16 20:36:40 2010 @@ -1878,7 +1878,8 @@ * @throws PlanException * @throws VisitorException */ - protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans, + @SuppressWarnings("deprecation") + protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans, FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans, String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException { @@ -2053,7 +2054,7 @@ if(val<=0) val = pigContext.defaultParallel; if (val<=0) - val = ((JobConf)((HExecutionEngine)eng).getJobClient().getConf()).getNumReduceTasks(); + val = ((JobConf)((HExecutionEngine)eng).getJobConf()).getNumReduceTasks(); if (val<=0) val = 1; } catch (Exception e) { Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=910677&r1=910676&r2=910677&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Feb 16 20:36:40 2010 @@ -109,7 +109,7 @@ ExecutionEngine exe = pc.getExecutionEngine(); ConfigurationValidator.validatePigProperties(exe.getConfiguration()); Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration()); - JobClient jobClient = ((HExecutionEngine)exe).getJobClient(); + JobClient jobClient = new JobClient(((HExecutionEngine)exe).getJobConf()); JobControlCompiler jcc = new JobControlCompiler(pc, conf); Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java?rev=910677&r1=910676&r2=910677&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java Tue Feb 16 20:36:40 2010 @@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.JobID; import org.apache.pig.FuncSpec; @@ -71,6 +72,7 @@ import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor; import org.apache.pig.impl.util.LogUtils; +...@suppresswarnings("deprecation") public class GruntParser extends PigScriptParser { private final Log log = LogFactory.getLog(getClass()); @@ -203,13 +205,14 @@ // ExecutionEngine execEngine = mPigServer.getPigContext().getExecutionEngine(); if (execEngine instanceof HExecutionEngine) { - mJobClient = ((HExecutionEngine)execEngine).getJobClient(); + mJobConf = ((HExecutionEngine)execEngine).getJobConf(); } else { - mJobClient = null; + mJobConf = null; } } + @Override public void prompt() { if (mInteractive) { @@ -217,6 +220,7 @@ } } + @Override protected void quit() { mDone = true; @@ -226,6 +230,7 @@ return mDone; } + @Override protected void processDescribe(String alias) throws IOException { if(alias==null) { alias = mPigServer.getPigContext().getLastAlias(); @@ -233,6 +238,7 @@ mPigServer.dumpSchema(alias); } + @Override protected void processExplain(String alias, String script, boolean isVerbose, String format, String target, List<String> params, List<String> files) @@ -317,10 +323,12 @@ } } + @Override protected void printAliases() throws IOException { mPigServer.printAliases(); } + @Override protected void processRegister(String jar) throws IOException { mPigServer.registerJar(jar); } @@ -344,6 +352,7 @@ return writer.toString(); } + @Override protected void processScript(String script, boolean batch, List<String> params, List<String> files) throws IOException, ParseException { @@ -416,6 +425,7 @@ } } + @Override protected void processSet(String key, String value) throws IOException, ParseException { if (key.equals("debug")) { @@ -460,6 +470,7 @@ } } + @Override protected void processCat(String path) throws IOException { executeBatch(); @@ -503,6 +514,7 @@ } } + @Override protected void processCD(String path) throws IOException { ContainerDescriptor container; @@ -534,6 +546,7 @@ } } + @Override protected void processDump(String alias) throws IOException { Iterator<Tuple> result = mPigServer.openIterator(alias); @@ -544,16 +557,19 @@ } } + @Override protected void processIllustrate(String alias) throws IOException { mPigServer.getExamples(alias); } + @Override protected void processKill(String jobid) throws IOException { - if (mJobClient != null) { + if (mJobConf != null) { + JobClient jc = new JobClient(mJobConf); JobID id = JobID.forName(jobid); - RunningJob job = mJobClient.getJob(id); + RunningJob job = jc.getJob(id); if (job == null) System.out.println("Job with id " + jobid + " is not active"); else @@ -564,6 +580,7 @@ } } + @Override protected void processLS(String path) throws IOException { try { @@ -613,11 +630,13 @@ System.out.println(elem.toString() + "<r " + replication + ">\t" + len); } + @Override protected void processPWD() throws IOException { System.out.println(mDfs.getActiveContainer().toString()); } + @Override protected void printHelp() { System.out.println("Commands:"); @@ -636,6 +655,7 @@ System.out.println("quit"); } + @Override protected void processMove(String src, String dst) throws IOException { executeBatch(); @@ -655,6 +675,7 @@ } } + @Override protected void processCopy(String src, String dst) throws IOException { executeBatch(); @@ -670,6 +691,7 @@ } } + @Override protected void processCopyToLocal(String src, String dst) throws IOException { executeBatch(); @@ -685,6 +707,7 @@ } } + @Override protected void processCopyFromLocal(String src, String dst) throws IOException { executeBatch(); @@ -700,12 +723,14 @@ } } + @Override protected void processMkdir(String dir) throws IOException { ContainerDescriptor dirDescriptor = mDfs.asContainer(dir); dirDescriptor.create(); } + @Override protected void processPig(String cmd) throws IOException { int start = 1; @@ -721,6 +746,7 @@ } } + @Override protected void processRemove(String path, String options ) throws IOException { ElementDescriptor dfsPath = mDfs.asElement(path); @@ -738,6 +764,7 @@ } } + @Override protected void processFsCommand(String[] cmdTokens) throws IOException{ try { shell.run(cmdTokens); @@ -773,7 +800,7 @@ private DataStorage mDfs; private DataStorage mLfs; private Properties mConf; - private JobClient mJobClient; + private JobConf mJobConf; private boolean mDone; private boolean mLoadOnly; private ExplainState mExplain;