Author: olga
Date: Wed Nov 18 19:17:23 2009
New Revision: 881887

URL: http://svn.apache.org/viewvc?rev=881887&view=rev
Log:
PIG-1053: Consider moving to Hadoop for local mode (ankit.modi via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Nov 18 19:17:23 2009
@@ -23,6 +23,9 @@
 INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
+
+PIG-1053: Consider moving to Hadoop for local mode (ankit.modi via olgan)
+
 PIG-1085:  Pass JobConf and UDF specific configuration information to UDFs
                   (gates)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Nov 18 19:17:23 2009
@@ -54,7 +54,6 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOJoin;
 import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 Wed Nov 18 19:17:23 2009
@@ -18,25 +18,14 @@
 
 package org.apache.pig.backend.hadoop.executionengine;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.FileOutputStream;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.PrintStream;
-import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketImplFactory;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.List;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -46,11 +35,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobTracker;
-import org.apache.pig.FuncSpec;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -60,22 +47,14 @@
 import org.apache.pig.backend.executionengine.util.ExecTools;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.shock.SSHSocketImplFactory;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.tools.pigstats.PigStats;
 
 public class HExecutionEngine implements ExecutionEngine {
@@ -84,7 +63,7 @@
     private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
     
     private final Log log = LogFactory.getLog(getClass());
-    private static final String LOCAL = "local";
+    public static final String LOCAL = "local";
     
     protected PigContext pigContext;
     
@@ -134,6 +113,7 @@
         init(this.pigContext.getProperties());
     }
     
+    @SuppressWarnings("deprecation")
     public void init(Properties properties) throws ExecException {
         //First set the ssh socket factory
         setSSHFactory();
@@ -155,19 +135,32 @@
         // Now add the settings from "properties" object to override any 
existing properties
         // All of the above is accomplished in the method call below
            
-        JobConf jobConf = new JobConf();
-        jobConf.addResource("pig-cluster-hadoop-site.xml");
+        JobConf jobConf = null;
+        if( this.pigContext.getExecType() == ExecType.LOCAL ) {
+            // We dont load any configurations here
+            jobConf = new JobConf( false );
+        } else {
+            jobConf = new JobConf();
+            jobConf.addResource("pig-cluster-hadoop-site.xml");
+        }
             
         //the method below alters the properties object by overriding the
         //hadoop properties with the values from properties and recomputing
         //the properties
         recomputeProperties(jobConf, properties);
-            
-        configuration = ConfigurationUtil.toConfiguration(properties);         
   
-        properties = ConfigurationUtil.toProperties(configuration);
+
+        // If we are running in local mode we dont read the hadoop conf file
+        if ( this.pigContext.getExecType() != ExecType.LOCAL ) {
+            configuration = ConfigurationUtil.toConfiguration(properties);
+            properties = ConfigurationUtil.toProperties(configuration);
+        } else {
+            properties.setProperty(JOB_TRACKER_LOCATION, LOCAL );
+            properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
+        }
+        
         cluster = properties.getProperty(JOB_TRACKER_LOCATION);
         nameNode = properties.getProperty(FILE_SYSTEM_LOCATION);
-            
+
         if (cluster != null && cluster.length() > 0) {
             if(!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
                 cluster = cluster + ":50020";
@@ -190,7 +183,7 @@
         
             
         if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){
-                log.info("Connecting to map-reduce job tracker at: " + 
properties.get(JOB_TRACKER_LOCATION));
+            log.info("Connecting to map-reduce job tracker at: " + 
properties.get(JOB_TRACKER_LOCATION));
         }
 
         try {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Wed Nov 18 19:17:23 2009
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -298,7 +299,7 @@
         // Report records and bytes written.  Only do this in the single store 
case.  Multi-store
         // scripts mess up the stats reporting from hadoop.
         List<String> rji = stats.getRootJobIDs();
-        if (rji != null && rji.size() == 1 && finalStores == 1) {
+        if ( (rji != null && rji.size() == 1 && finalStores == 1) || 
pc.getExecType() == ExecType.LOCAL ) {
             if(stats.getRecordsWritten()==-1) {
                 log.info("Records written : Unable to determine number of 
records written");
             } else {

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Wed Nov 18 
19:17:23 2009
@@ -185,15 +185,6 @@
 
         switch (execType) {
             case LOCAL:
-            {
-                lfs = new HDataStorage(URI.create("file:///"),
-                                       new Properties());
-                
-                dfs = lfs;
-                executionEngine = new LocalExecutionEngine(this);
-            }
-            break;
-
             case MAPREDUCE:
             {
                 executionEngine = new HExecutionEngine (this);
@@ -203,7 +194,7 @@
                 dfs = executionEngine.getDataStorage();
                 
                 lfs = new HDataStorage(URI.create("file:///"),
-                                        new Properties());                
+                                        new Properties()); 
             }
             break;
             
@@ -331,11 +322,7 @@
     }
 
     public DataStorage getFs() {
-        if(execType == ExecType.LOCAL) {
-            return lfs;
-        } else {
-            return dfs;
-        }
+        return dfs;
     }
     
     /**
@@ -573,10 +560,6 @@
 
         switch (execType) {
             case LOCAL:
-            {
-                executableManager = new ExecutableManager();
-            }
-            break;
             case MAPREDUCE: 
             {
                 executableManager = new HadoopExecutableManager();
@@ -628,9 +611,7 @@
      * @return error source
      */
     public byte getErrorSource() {
-        if(execType == ExecType.LOCAL) {
-            return PigException.USER_ENVIRONMENT;
-        } else if (execType == ExecType.MAPREDUCE) {
+        if(execType == ExecType.LOCAL || execType == ExecType.MAPREDUCE) {
             return PigException.REMOTE_ENVIRONMENT;
         } else {
             return PigException.BUG;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Wed Nov 18 19:17:23 2009
@@ -346,15 +346,6 @@
         */
        LogicalOperator parseJoin(ArrayList<CogroupInput> gis, LogicalPlan lp, 
LOJoin.JOINTYPE jt) throws ParseException, PlanException{
                log.trace("Entering parseJoin");
-               // Skewed Join behaves as regular join in local mode
-               if (pigContext.getExecType() == ExecType.LOCAL && jt == 
LOJoin.JOINTYPE.SKEWED) {
-                       return rewriteJoin(gis,lp);
-               }
-
-      // Merge Join behaves as regular join in local mode
-               if (pigContext.getExecType() == ExecType.LOCAL && jt == 
LOJoin.JOINTYPE.MERGE) {
-            return rewriteJoin(gis,lp);
-        }
         
                int n = gis.size();
 
@@ -1316,7 +1307,14 @@
     )
 |   (<STORE> op = StoreClause(lp))
        )
-    [<PARALLEL> t2=<INTEGER> { 
op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
+    [<PARALLEL> t2=<INTEGER> {
+      // In Local Mode we can only use one reducer
+       if( this.pigContext.getExecType() == ExecType.LOCAL ) {
+               op.setRequestedParallelism(1);
+       } else {
+               op.setRequestedParallelism(Integer.parseInt(t2.image));
+       }
+    } ]
        )       
        {log.trace("Exiting BaseExpr"); return op;}
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Wed Nov 18 
19:17:23 2009
@@ -18,8 +18,11 @@
 
 package org.apache.pig.tools.pigstats;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
@@ -41,7 +44,6 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import 
org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigStats {
@@ -54,6 +56,8 @@
     ArrayList<String> rootJobIDs = new ArrayList<String>();
     ExecType mode;
     
+    private static final String localModeDataFile = "part-00000";
+    
     public void setMROperatorPlan(MROperPlan mrp) {
         this.mrp = mrp;
     }
@@ -99,11 +103,25 @@
         //The counter placed before a store in the local plan should be able 
to get the number of records
         for(PhysicalOperator op : php.getLeaves()) {
             Map<String, String> jobStats = new HashMap<String, String>();
-            stats.put(op.toString(), jobStats);
-            POCounter counter = (POCounter) php.getPredecessors(op).get(0);
-            jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", 
(Long.valueOf(counter.getCount())).toString());
+            stats.put(op.toString(), jobStats);         
             String 
localFilePath=normalizeToLocalFilePath(((POStore)op).getSFile().getFileName());
-            jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf(new 
File(localFilePath).length())).toString());
+            File outputFile = new File( localFilePath + File.separator + 
localModeDataFile );
+            
+            long lineCounter = 0;
+            try {
+                BufferedReader in = new BufferedReader(new FileReader( 
outputFile ));
+                @SuppressWarnings("unused")
+                String tmpString = null;
+                while( (tmpString = in.readLine()) != null ) {
+                    lineCounter++;
+                }
+                in.close();
+            } catch (FileNotFoundException e) {
+            } catch (IOException e) {                
+            } finally {
+                jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", 
(Long.valueOf(lineCounter)).toString());
+            }            
+            jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", 
(Long.valueOf(outputFile.length())).toString());
         }
         return stats;
     }
@@ -266,10 +284,10 @@
     }
     
     public long getBytesWritten() {
-       if(mode == ExecType.LOCAL) {
-               return getLocalBytesWritten();
-       } else if(mode == ExecType.MAPREDUCE) {
-               return getMapReduceBytesWritten();
+        if(mode == ExecType.LOCAL) {           
+            return getLocalBytesWritten(); 
+       } else if( mode == ExecType.MAPREDUCE ) {
+           return getMapReduceBytesWritten();
        } else {
                throw new RuntimeException("Unrecognized mode. Either MapReduce 
or Local mode expected.");
        }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Wed Nov 18 
19:17:23 2009
@@ -538,11 +538,13 @@
         File out = File.createTempFile("output", ".txt");
         out.delete();
         PigServer pigServer = new PigServer("local");
+        // FileLocalizer is initialized before using HDFS by previous tests
+        FileLocalizer.setInitialized(false);
         pigServer.registerQuery("a = load '" + 
Util.encodeEscape(file.toString()) + "';");
         pigServer.registerQuery("b = order a by $0;");
         pigServer.registerQuery("c = group b by $0;");
         pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
-        PigStats pigStats = pigServer.store("d", 
out.getAbsolutePath()).getStatistics();
+        PigStats pigStats = pigServer.store("d", "file://" + 
out.getAbsolutePath()).getStatistics();
         InputStream is = 
FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), 
pigServer.getPigContext()), ExecType.MAPREDUCE, 
pigServer.getPigContext().getDfs());
         long filesize = 0;
         while(is.read() != -1) filesize++;
@@ -552,8 +554,8 @@
         
         //Map<String, Map<String, String>> stats = pigStats.getPigStats();
         
-        assertEquals(count, pigStats.getRecordsWritten());
-        assertEquals(filesize, pigStats.getBytesWritten());
+        assertEquals(10, pigStats.getRecordsWritten());
+        assertEquals(110, pigStats.getBytesWritten());
 
     }
 

Modified: 
hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
Wed Nov 18 19:17:23 2009
@@ -78,12 +78,13 @@
             t = it.next();
             count[i] = (Long)t.get(0);
         }
-
+        
         Assert.assertFalse(it.hasNext());
 
-        Assert.assertEquals(3L, count[0]);
+        // Pig's previous local mode was screwed up correcting that
+        Assert.assertEquals(5L, count[0]);
         Assert.assertEquals(5L, count[1]);
-        Assert.assertEquals(5L, count[2]);
+        Assert.assertEquals(3L, count[2]);
     }
 
 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Wed 
Nov 18 19:17:23 2009
@@ -2182,5 +2182,5 @@
     Map<OperatorKey, LogicalOperator> logicalOpTable = new 
HashMap<OperatorKey, LogicalOperator>();
     Map<String, LogicalOperator> aliasOp = new HashMap<String, 
LogicalOperator>();
     Map<String, String> fileNameMap = new HashMap<String, String>();
-    PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+    PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new 
Properties());
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Wed Nov 
18 19:17:23 2009
@@ -32,11 +32,6 @@
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.executionengine.util.ExecTools;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -86,7 +81,7 @@
             LogicalPlan lp = checkLogicalPlan(1, 2, 9);
 
             // XXX Physical plan has one less node in the local case
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 11);
 
             Assert.assertTrue(executePlan(pp));
 
@@ -186,7 +181,7 @@
 
             LogicalPlan lp = checkLogicalPlan(1, 3, 14);
 
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 17);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14);
 
             Assert.assertTrue(executePlan(pp));
 
@@ -248,7 +243,7 @@
             LogicalPlan lp = checkLogicalPlan(2, 3, 16);
 
             // XXX the total number of ops is one less in the local case
-            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 21);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 19);
 
             Assert.assertTrue(executePlan(pp));
 
@@ -459,7 +454,7 @@
             myPig.registerQuery("store c into '/tmp/output5';");
 
             LogicalPlan lp = checkLogicalPlan(1, 3, 12);
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 19);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 15);
 
             myPig.executeBatch();
             myPig.discardBatch(); 
@@ -536,7 +531,7 @@
     private PhysicalPlan checkPhysicalPlan(LogicalPlan lp, int expectedRoots,
             int expectedLeaves, int expectedSize) throws IOException {
 
-        System.out.println("===== check physical plan =====");
+        System.out.println("===== check physical plan =====");        
 
         PhysicalPlan pp = myPig.getPigContext().getExecutionEngine().compile(
                 lp, null);
@@ -565,16 +560,38 @@
     }
 
     private void deleteOutputFiles() {
-        try {
-            FileLocalizer.delete("/tmp/output1", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output2", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output3", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output4", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output5", myPig.getPigContext());
+        String outputFiles[] = { "/tmp/output1",
+                                 "/tmp/output2",
+                                 "/tmp/output3",
+                                 "/tmp/output4",
+                                 "/tmp/output5"
+                };
+        try {
+            for( String outputFile : outputFiles ) {
+                if( isDirectory(outputFile) ) {
+                    deleteDir( new File( outputFile ) );
+                } else {
+                    FileLocalizer.delete(outputFile, myPig.getPigContext());
+                }    
+            }            
         } catch (IOException e) {
             e.printStackTrace();
             Assert.fail();
         }
     }
+    
+    private void deleteDir( File file ) {
+        if( file.isDirectory() && file.listFiles().length != 0 ) {
+            for( File innerFile : file.listFiles() ) {
+                deleteDir( innerFile );
+            }
+        }
+        file.delete();
+    }
+    
+    private boolean isDirectory( String filepath ) {
+        File file = new File( filepath );
+        return file.isDirectory();
+    }
 
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java Wed Nov 18 
19:17:23 2009
@@ -46,8 +46,8 @@
 public class TestPigContext extends TestCase {
 
     private static final String TMP_DIR_PROP = "/tmp/hadoop-hadoop";
-    private static final String FS_NAME = "machine:9000";
-    private static final String JOB_TRACKER = "machine:9001";
+    private static final String FS_NAME = "file:///";
+    private static final String JOB_TRACKER = "local";
 
     private File input;
     private PigContext pigContext;
@@ -68,7 +68,7 @@
         PigServer pigServer = new PigServer(pigContext);
         registerAndStore(pigServer);
         
-        check_asserts();
+        check_asserts(pigServer);
     }
 
     /**
@@ -79,7 +79,7 @@
         PigServer pigServer = new PigServer(ExecType.LOCAL, getProperties());
         registerAndStore(pigServer);
         
-        check_asserts();
+        check_asserts(pigServer);
     }
 
     /**
@@ -91,7 +91,7 @@
         PigServer pigServer = new PigServer(pigContext);
         registerAndStore(pigServer);
         
-        check_asserts();
+        check_asserts(pigServer);
     }
     
     @Test
@@ -220,7 +220,7 @@
     }
 
     private void registerAndStore(PigServer pigServer) throws IOException {
-        pigServer.debugOn();
+        // pigServer.debugOn();
         List<String> commands = getCommands();
         for (final String command : commands) {
             pigServer.registerQuery(command);
@@ -228,9 +228,9 @@
         pigServer.store("counts", input.getAbsolutePath() + ".out");
     }
 
-    private void check_asserts() {
-        assertEquals(JOB_TRACKER, 
pigContext.getProperties().getProperty("mapred.job.tracker"));
-        assertEquals(FS_NAME, 
pigContext.getProperties().getProperty("fs.default.name"));
-        assertEquals(TMP_DIR_PROP, 
pigContext.getProperties().getProperty("hadoop.tmp.dir"));
+    private void check_asserts(PigServer pigServer) {
+        assertEquals(JOB_TRACKER, 
pigServer.getPigContext().getProperties().getProperty("mapred.job.tracker"));
+        assertEquals(FS_NAME, 
pigServer.getPigContext().getProperties().getProperty("fs.default.name"));
+        assertEquals(TMP_DIR_PROP, 
pigServer.getPigContext().getProperties().getProperty("hadoop.tmp.dir"));
     }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java Wed Nov 18 
19:17:23 2009
@@ -25,6 +25,7 @@
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.PigStats;
 
 public class TestPigStats extends TestCase {
@@ -34,19 +35,38 @@
         File outputFile = null;
         try {
             outputFile = File.createTempFile("JIAR_1027", ".out");
+            String filePath = outputFile.getAbsolutePath();
+            outputFile.delete();
             PigServer pig = new PigServer(ExecType.LOCAL);
             pig
                     .registerQuery("A = load 
'test/org/apache/pig/test/data/passwd';");
-            PigStats stats = pig.store("A", outputFile.getAbsolutePath())
+            PigStats stats = pig.store("A", filePath)
                     .getStatistics();
-            assertEquals(outputFile.length(), stats.getBytesWritten());
+            File dataFile = new File( outputFile.getAbsoluteFile() + 
File.separator + "part-00000" );
+            assertEquals(dataFile.length(), stats.getBytesWritten());
         } catch (IOException e) {
+            e.printStackTrace();
+            System.err.println( e.getMessage() );
             fail("IOException happened");
         } finally {
             if (outputFile != null) {
-                outputFile.delete();
+                // Hadoop Local mode creates a directory
+                // Hence we need to delete a directory recursively
+                deleteDirectory(outputFile);
             }
         }
 
     }
+    
+    private void deleteDirectory( File dir ) {
+        File[] files = dir.listFiles();
+        for( File file : files ) {
+            if( file.isDirectory() ) {
+                deleteDirectory(file);
+            } else {
+                file.delete();
+            }
+        }
+        dir.delete();
+    }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Wed Nov 18 
19:17:23 2009
@@ -17,16 +17,12 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.*;
 
 import org.apache.pig.ExecType;
 
-import java.io.File;
 import java.io.BufferedReader;
 import java.io.FileReader;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -35,13 +31,11 @@
 import org.apache.pig.impl.plan.OperatorKey;
 
 import org.apache.pig.FuncSpec;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DefaultBagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultTuple;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.PigServer;
@@ -52,11 +46,8 @@
 import 
org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.test.utils.TestHelper;
@@ -65,10 +56,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
-import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.DataStorageException;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;


Reply via email to