Author: pradeepkth
Date: Tue Feb 16 20:37:31 2010
New Revision: 910678

URL: http://svn.apache.org/viewvc?rev=910678&view=rev
Log:
PIG-1239: PigContext.connect() should not create a jobClient and jobClient 
should be created on demand when needed (pradeepkth)

Modified:
    hadoop/pig/branches/load-store-redesign/CHANGES.txt
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/grunt/GruntParser.java

Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=910678&r1=910677&r2=910678&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Tue Feb 16 20:37:31 2010
@@ -121,6 +121,9 @@
 
 BUG FIXES
 
+PIG-1239: PigContext.connect() should not create a jobClient and jobClient
+should be created on demand when needed (pradeepkth)
+
 PIG-1154: Local Mode fails when hadoop config directory is specified in 
             classpath (ankit.modi via gates)
 

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=910678&r1=910677&r2=910678&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 Tue Feb 16 20:37:31 2010
@@ -77,7 +77,8 @@
     
     protected DataStorage ds;
     
-    protected JobClient jobClient;
+    @SuppressWarnings("deprecation")
+    protected JobConf jobConf;
 
     // key: the operator key from the logical plan that originated the 
physical plan
     // val: the operator key for the root of the phyisical plan
@@ -97,11 +98,12 @@
         this.ds = null;
         
         // to be set in the init method
-        this.jobClient = null;
+        this.jobConf = null;
     }
     
-    public JobClient getJobClient() {
-        return this.jobClient;
+    @SuppressWarnings("deprecation")
+    public JobConf getJobConf() {
+        return this.jobConf;
     }
     
     public Map<OperatorKey, MapRedResult> getMaterializedResults() {
@@ -143,19 +145,19 @@
         // Now add the settings from "properties" object to override any 
existing properties
         // All of the above is accomplished in the method call below
            
-        JobConf jobConf = null;
+        JobConf jc = null;
         if( this.pigContext.getExecType() == ExecType.LOCAL ) {
             // We dont load any configurations here
-            jobConf = new JobConf( false );
+            jc = new JobConf( false );
         } else {
-            jobConf = new JobConf();
-            jobConf.addResource("pig-cluster-hadoop-site.xml");
+            jc = new JobConf();
+            jc.addResource("pig-cluster-hadoop-site.xml");
         }
             
         //the method below alters the properties object by overriding the
         //hadoop properties with the values from properties and recomputing
         //the properties
-        recomputeProperties(jobConf, properties);
+        recomputeProperties(jc, properties);
 
         // If we are running in local mode we dont read the hadoop conf file
         if ( this.pigContext.getExecType() != ExecType.LOCAL ) {
@@ -218,15 +220,8 @@
             log.info("Connecting to map-reduce job tracker at: " + 
properties.get(JOB_TRACKER_LOCATION));
         }
 
-        try {
-            // Set job-specific configuration knobs
-            jobClient = new JobClient(new JobConf(configuration));
-        }
-        catch (IOException e) {
-            int errCode = 6009;
-            String msg = "Failed to create job client:" + e.getMessage();
-            throw new ExecException(msg, errCode, PigException.BUG, e);
-        }
+        // Set job-specific configuration knobs
+        jobConf = new JobConf(configuration);
     }
 
     public Properties getConfiguration() throws ExecException {

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=910678&r1=910677&r2=910678&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Tue Feb 16 20:37:31 2010
@@ -1888,7 +1888,8 @@
      * @throws PlanException
      * @throws VisitorException
      */
-       protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, 
MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
+       @SuppressWarnings("deprecation")
+    protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, 
MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
                        FileSpec lFile, FileSpec sampleFile, int rp, 
List<PhysicalPlan> sortKeyPlans, 
                        String udfClassName, String[] udfArgs, String 
sampleLdrClassName ) throws PlanException, VisitorException {
                
@@ -2064,7 +2065,7 @@
                     if(val<=0)
                         val = pigContext.defaultParallel;
                     if (val<=0)
-                        val = 
((JobConf)((HExecutionEngine)eng).getJobClient().getConf()).getNumReduceTasks();
+                        val = 
((JobConf)((HExecutionEngine)eng).getJobConf()).getNumReduceTasks();
                     if (val<=0)
                         val = 1;
                 } catch (Exception e) {

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=910678&r1=910677&r2=910678&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Tue Feb 16 20:37:31 2010
@@ -112,7 +112,7 @@
         ExecutionEngine exe = pc.getExecutionEngine();
         ConfigurationValidator.validatePigProperties(exe.getConfiguration());
         Configuration conf = 
ConfigurationUtil.toConfiguration(exe.getConfiguration());
-        JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
+        JobClient jobClient = new 
JobClient(((HExecutionEngine)exe).getJobConf());
 
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
         

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=910678&r1=910677&r2=910678&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 Tue Feb 16 20:37:31 2010
@@ -18,11 +18,9 @@
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,7 +30,6 @@
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -43,15 +40,12 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.util.ObjectSerializer;
 
 /** This operator implements merge join algorithm to do map side joins. 
  *  Currently, only two-way joins are supported. One input of join is 
identified as left
@@ -105,8 +99,6 @@
 
     private boolean noInnerPlanOnRightSide;
 
-    private PigContext pc;
-
     private Object curJoinKey;
 
     private Tuple curJoiningRightTup;
@@ -395,8 +387,6 @@
     @SuppressWarnings("unchecked")
     private void seekInRightStream(Object firstLeftKey) throws IOException{
         rightLoader = 
(IndexableLoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
-        pc = 
(PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
-        pc.connect();
         // Pass signature of the loader to rightLoader
         // make a copy of the conf to use in calls to rightLoader.
         Configuration conf = new Configuration(PigMapReduce.sJobConf);

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/grunt/GruntParser.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/grunt/GruntParser.java?rev=910678&r1=910677&r2=910678&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/grunt/GruntParser.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/grunt/GruntParser.java
 Tue Feb 16 20:37:31 2010
@@ -44,6 +44,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.pig.PigServer;
@@ -63,6 +64,7 @@
 import org.apache.pig.tools.pigscript.parser.PigScriptParser;
 import org.apache.pig.tools.pigscript.parser.PigScriptParserTokenManager;
 
+...@suppresswarnings("deprecation")
 public class GruntParser extends PigScriptParser {
 
     private final Log log = LogFactory.getLog(getClass());
@@ -195,13 +197,14 @@
         //
         ExecutionEngine execEngine = 
mPigServer.getPigContext().getExecutionEngine();
         if (execEngine instanceof HExecutionEngine) {
-            mJobClient = ((HExecutionEngine)execEngine).getJobClient();
+            mJobConf = ((HExecutionEngine)execEngine).getJobConf();
         }
         else {
-            mJobClient = null;
+            mJobConf = null;
         }
     }
 
+    @Override
     public void prompt()
     {
         if (mInteractive) {
@@ -209,6 +212,7 @@
         }
     }
     
+    @Override
     protected void quit()
     {
         mDone = true;
@@ -218,6 +222,7 @@
         return mDone;
     }
     
+    @Override
     protected void processDescribe(String alias) throws IOException {
         if(alias==null) {
             alias = mPigServer.getPigContext().getLastAlias();
@@ -225,6 +230,7 @@
         mPigServer.dumpSchema(alias);
     }
 
+    @Override
     protected void processExplain(String alias, String script, boolean 
isVerbose, 
                                   String format, String target, 
                                   List<String> params, List<String> files) 
@@ -309,10 +315,12 @@
         }
     }
 
+    @Override
     protected void printAliases() throws IOException {
         mPigServer.printAliases();
     }
     
+    @Override
     protected void processRegister(String jar) throws IOException {
         mPigServer.registerJar(jar);
     }
@@ -336,6 +344,7 @@
         return writer.toString();
     }
 
+    @Override
     protected void processScript(String script, boolean batch, 
                                  List<String> params, List<String> files) 
         throws IOException, ParseException {
@@ -408,6 +417,7 @@
         }
     }
 
+    @Override
     protected void processSet(String key, String value) throws IOException, 
ParseException {
         if (key.equals("debug"))
         {
@@ -452,6 +462,7 @@
         }
     }
     
+    @Override
     protected void processCat(String path) throws IOException
     {
         executeBatch();
@@ -495,6 +506,7 @@
         }
     }
 
+    @Override
     protected void processCD(String path) throws IOException
     {    
         ContainerDescriptor container;
@@ -526,6 +538,7 @@
         }
     }
 
+    @Override
     protected void processDump(String alias) throws IOException
     {
         Iterator<Tuple> result = mPigServer.openIterator(alias);
@@ -536,16 +549,19 @@
         }
     }
     
+    @Override
     protected void processIllustrate(String alias) throws IOException
     {
        mPigServer.getExamples(alias);
     }
 
+    @Override
     protected void processKill(String jobid) throws IOException
     {
-        if (mJobClient != null) {
+        if (mJobConf != null) {
+            JobClient jc = new JobClient(mJobConf);
             JobID id = JobID.forName(jobid);
-            RunningJob job = mJobClient.getJob(id);
+            RunningJob job = jc.getJob(id);
             if (job == null)
                 System.out.println("Job with id " + jobid + " is not active");
             else
@@ -556,6 +572,7 @@
         }
     }
         
+    @Override
     protected void processLS(String path) throws IOException
     {
         try {
@@ -605,11 +622,13 @@
         System.out.println(elem.toString() + "<r " + replication + ">\t" + 
len);
     }
     
+    @Override
     protected void processPWD() throws IOException 
     {
         System.out.println(mDfs.getActiveContainer().toString());
     }
 
+    @Override
     protected void printHelp() 
     {
         System.out.println("Commands:");
@@ -628,6 +647,7 @@
         System.out.println("quit");
     }
 
+    @Override
     protected void processMove(String src, String dst) throws IOException
     {
         executeBatch();
@@ -647,6 +667,7 @@
         }
     }
     
+    @Override
     protected void processCopy(String src, String dst) throws IOException
     {
         executeBatch();
@@ -662,6 +683,7 @@
         }
     }
     
+    @Override
     protected void processCopyToLocal(String src, String dst) throws 
IOException
     {
         executeBatch();
@@ -677,6 +699,7 @@
         }
     }
 
+    @Override
     protected void processCopyFromLocal(String src, String dst) throws 
IOException
     {
         executeBatch();
@@ -692,12 +715,14 @@
         }
     }
     
+    @Override
     protected void processMkdir(String dir) throws IOException
     {
         ContainerDescriptor dirDescriptor = mDfs.asContainer(dir);
         dirDescriptor.create();
     }
     
+    @Override
     protected void processPig(String cmd) throws IOException
     {
         int start = 1;
@@ -713,6 +738,7 @@
         }
     }
 
+    @Override
     protected void processRemove(String path, String options ) throws 
IOException
     {
         ElementDescriptor dfsPath = mDfs.asElement(path);
@@ -730,6 +756,7 @@
         }
     }
 
+    @Override
     protected void processFsCommand(String[] cmdTokens) throws IOException{
         try {
             shell.run(cmdTokens);
@@ -765,7 +792,7 @@
     private DataStorage mDfs;
     private DataStorage mLfs;
     private Properties mConf;
-    private JobClient mJobClient;
+    private JobConf mJobConf;
     private boolean mDone;
     private boolean mLoadOnly;
     private ExplainState mExplain;


Reply via email to