Author: olga
Date: Wed Nov 18 19:17:23 2009
New Revision: 881887
URL: http://svn.apache.org/viewvc?rev=881887&view=rev
Log:
PIG-1053: Consider moving to Hadoop for local mode (ankit.modi via olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Nov 18 19:17:23 2009
@@ -23,6 +23,9 @@
INCOMPATIBLE CHANGES
IMPROVEMENTS
+
+PIG-1053: Consider moving to Hadoop for local mode (ankit.modi via olgan)
+
PIG-1085: Pass JobConf and UDF specific configuration information to UDFs
(gates)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Nov 18 19:17:23 2009
@@ -54,7 +54,6 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
Wed Nov 18 19:17:23 2009
@@ -18,25 +18,14 @@
package org.apache.pig.backend.hadoop.executionengine;
-import java.io.File;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.FileOutputStream;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.PrintStream;
-import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketImplFactory;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
import java.util.Collection;
import java.util.List;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
@@ -46,11 +35,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobTracker;
-import org.apache.pig.FuncSpec;
+import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
@@ -60,22 +47,14 @@
import org.apache.pig.backend.executionengine.util.ExecTools;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.builtin.BinStorage;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.shock.SSHSocketImplFactory;
-import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.tools.pigstats.PigStats;
public class HExecutionEngine implements ExecutionEngine {
@@ -84,7 +63,7 @@
private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
private final Log log = LogFactory.getLog(getClass());
- private static final String LOCAL = "local";
+ public static final String LOCAL = "local";
protected PigContext pigContext;
@@ -134,6 +113,7 @@
init(this.pigContext.getProperties());
}
+ @SuppressWarnings("deprecation")
public void init(Properties properties) throws ExecException {
//First set the ssh socket factory
setSSHFactory();
@@ -155,19 +135,32 @@
// Now add the settings from "properties" object to override any
existing properties
// All of the above is accomplished in the method call below
- JobConf jobConf = new JobConf();
- jobConf.addResource("pig-cluster-hadoop-site.xml");
+ JobConf jobConf = null;
+ if( this.pigContext.getExecType() == ExecType.LOCAL ) {
+ // We dont load any configurations here
+ jobConf = new JobConf( false );
+ } else {
+ jobConf = new JobConf();
+ jobConf.addResource("pig-cluster-hadoop-site.xml");
+ }
//the method below alters the properties object by overriding the
//hadoop properties with the values from properties and recomputing
//the properties
recomputeProperties(jobConf, properties);
-
- configuration = ConfigurationUtil.toConfiguration(properties);
- properties = ConfigurationUtil.toProperties(configuration);
+
+ // If we are running in local mode we dont read the hadoop conf file
+ if ( this.pigContext.getExecType() != ExecType.LOCAL ) {
+ configuration = ConfigurationUtil.toConfiguration(properties);
+ properties = ConfigurationUtil.toProperties(configuration);
+ } else {
+ properties.setProperty(JOB_TRACKER_LOCATION, LOCAL );
+ properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
+ }
+
cluster = properties.getProperty(JOB_TRACKER_LOCATION);
nameNode = properties.getProperty(FILE_SYSTEM_LOCATION);
-
+
if (cluster != null && cluster.length() > 0) {
if(!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
cluster = cluster + ":50020";
@@ -190,7 +183,7 @@
if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){
- log.info("Connecting to map-reduce job tracker at: " +
properties.get(JOB_TRACKER_LOCATION));
+ log.info("Connecting to map-reduce job tracker at: " +
properties.get(JOB_TRACKER_LOCATION));
}
try {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Wed Nov 18 19:17:23 2009
@@ -35,6 +35,7 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
@@ -298,7 +299,7 @@
// Report records and bytes written. Only do this in the single store
case. Multi-store
// scripts mess up the stats reporting from hadoop.
List<String> rji = stats.getRootJobIDs();
- if (rji != null && rji.size() == 1 && finalStores == 1) {
+ if ( (rji != null && rji.size() == 1 && finalStores == 1) ||
pc.getExecType() == ExecType.LOCAL ) {
if(stats.getRecordsWritten()==-1) {
log.info("Records written : Unable to determine number of
records written");
} else {
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Wed Nov 18
19:17:23 2009
@@ -185,15 +185,6 @@
switch (execType) {
case LOCAL:
- {
- lfs = new HDataStorage(URI.create("file:///"),
- new Properties());
-
- dfs = lfs;
- executionEngine = new LocalExecutionEngine(this);
- }
- break;
-
case MAPREDUCE:
{
executionEngine = new HExecutionEngine (this);
@@ -203,7 +194,7 @@
dfs = executionEngine.getDataStorage();
lfs = new HDataStorage(URI.create("file:///"),
- new Properties());
+ new Properties());
}
break;
@@ -331,11 +322,7 @@
}
public DataStorage getFs() {
- if(execType == ExecType.LOCAL) {
- return lfs;
- } else {
- return dfs;
- }
+ return dfs;
}
/**
@@ -573,10 +560,6 @@
switch (execType) {
case LOCAL:
- {
- executableManager = new ExecutableManager();
- }
- break;
case MAPREDUCE:
{
executableManager = new HadoopExecutableManager();
@@ -628,9 +611,7 @@
* @return error source
*/
public byte getErrorSource() {
- if(execType == ExecType.LOCAL) {
- return PigException.USER_ENVIRONMENT;
- } else if (execType == ExecType.MAPREDUCE) {
+ if(execType == ExecType.LOCAL || execType == ExecType.MAPREDUCE) {
return PigException.REMOTE_ENVIRONMENT;
} else {
return PigException.BUG;
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
Wed Nov 18 19:17:23 2009
@@ -346,15 +346,6 @@
*/
LogicalOperator parseJoin(ArrayList<CogroupInput> gis, LogicalPlan lp,
LOJoin.JOINTYPE jt) throws ParseException, PlanException{
log.trace("Entering parseJoin");
- // Skewed Join behaves as regular join in local mode
- if (pigContext.getExecType() == ExecType.LOCAL && jt ==
LOJoin.JOINTYPE.SKEWED) {
- return rewriteJoin(gis,lp);
- }
-
- // Merge Join behaves as regular join in local mode
- if (pigContext.getExecType() == ExecType.LOCAL && jt ==
LOJoin.JOINTYPE.MERGE) {
- return rewriteJoin(gis,lp);
- }
int n = gis.size();
@@ -1316,7 +1307,14 @@
)
| (<STORE> op = StoreClause(lp))
)
- [<PARALLEL> t2=<INTEGER> {
op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
+ [<PARALLEL> t2=<INTEGER> {
+ // In Local Mode we can only use one reducer
+ if( this.pigContext.getExecType() == ExecType.LOCAL ) {
+ op.setRequestedParallelism(1);
+ } else {
+ op.setRequestedParallelism(Integer.parseInt(t2.image));
+ }
+ } ]
)
{log.trace("Exiting BaseExpr"); return op;}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Wed Nov 18
19:17:23 2009
@@ -18,8 +18,11 @@
package org.apache.pig.tools.pigstats;
+import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
@@ -41,7 +44,6 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import
org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
import org.apache.pig.impl.util.ObjectSerializer;
public class PigStats {
@@ -54,6 +56,8 @@
ArrayList<String> rootJobIDs = new ArrayList<String>();
ExecType mode;
+ private static final String localModeDataFile = "part-00000";
+
public void setMROperatorPlan(MROperPlan mrp) {
this.mrp = mrp;
}
@@ -99,11 +103,25 @@
//The counter placed before a store in the local plan should be able
to get the number of records
for(PhysicalOperator op : php.getLeaves()) {
Map<String, String> jobStats = new HashMap<String, String>();
- stats.put(op.toString(), jobStats);
- POCounter counter = (POCounter) php.getPredecessors(op).get(0);
- jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS",
(Long.valueOf(counter.getCount())).toString());
+ stats.put(op.toString(), jobStats);
String
localFilePath=normalizeToLocalFilePath(((POStore)op).getSFile().getFileName());
- jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf(new
File(localFilePath).length())).toString());
+ File outputFile = new File( localFilePath + File.separator +
localModeDataFile );
+
+ long lineCounter = 0;
+ try {
+ BufferedReader in = new BufferedReader(new FileReader(
outputFile ));
+ @SuppressWarnings("unused")
+ String tmpString = null;
+ while( (tmpString = in.readLine()) != null ) {
+ lineCounter++;
+ }
+ in.close();
+ } catch (FileNotFoundException e) {
+ } catch (IOException e) {
+ } finally {
+ jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS",
(Long.valueOf(lineCounter)).toString());
+ }
+ jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN",
(Long.valueOf(outputFile.length())).toString());
}
return stats;
}
@@ -266,10 +284,10 @@
}
public long getBytesWritten() {
- if(mode == ExecType.LOCAL) {
- return getLocalBytesWritten();
- } else if(mode == ExecType.MAPREDUCE) {
- return getMapReduceBytesWritten();
+ if(mode == ExecType.LOCAL) {
+ return getLocalBytesWritten();
+ } else if( mode == ExecType.MAPREDUCE ) {
+ return getMapReduceBytesWritten();
} else {
throw new RuntimeException("Unrecognized mode. Either MapReduce
or Local mode expected.");
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Wed Nov 18
19:17:23 2009
@@ -538,11 +538,13 @@
File out = File.createTempFile("output", ".txt");
out.delete();
PigServer pigServer = new PigServer("local");
+ // FileLocalizer is initialized before using HDFS by previous tests
+ FileLocalizer.setInitialized(false);
pigServer.registerQuery("a = load '" +
Util.encodeEscape(file.toString()) + "';");
pigServer.registerQuery("b = order a by $0;");
pigServer.registerQuery("c = group b by $0;");
pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
- PigStats pigStats = pigServer.store("d",
out.getAbsolutePath()).getStatistics();
+ PigStats pigStats = pigServer.store("d", "file://" +
out.getAbsolutePath()).getStatistics();
InputStream is =
FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(),
pigServer.getPigContext()), ExecType.MAPREDUCE,
pigServer.getPigContext().getDfs());
long filesize = 0;
while(is.read() != -1) filesize++;
@@ -552,8 +554,8 @@
//Map<String, Map<String, String>> stats = pigStats.getPigStats();
- assertEquals(count, pigStats.getRecordsWritten());
- assertEquals(filesize, pigStats.getBytesWritten());
+ assertEquals(10, pigStats.getRecordsWritten());
+ assertEquals(110, pigStats.getBytesWritten());
}
Modified:
hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
Wed Nov 18 19:17:23 2009
@@ -78,12 +78,13 @@
t = it.next();
count[i] = (Long)t.get(0);
}
-
+
Assert.assertFalse(it.hasNext());
- Assert.assertEquals(3L, count[0]);
+ // Pig's previous local mode was screwed up correcting that
+ Assert.assertEquals(5L, count[0]);
Assert.assertEquals(5L, count[1]);
- Assert.assertEquals(5L, count[2]);
+ Assert.assertEquals(3L, count[2]);
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Wed
Nov 18 19:17:23 2009
@@ -2182,5 +2182,5 @@
Map<OperatorKey, LogicalOperator> logicalOpTable = new
HashMap<OperatorKey, LogicalOperator>();
Map<String, LogicalOperator> aliasOp = new HashMap<String,
LogicalOperator>();
Map<String, String> fileNameMap = new HashMap<String, String>();
- PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+ PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new
Properties());
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Wed Nov
18 19:17:23 2009
@@ -32,11 +32,6 @@
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.executionengine.util.ExecTools;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -86,7 +81,7 @@
LogicalPlan lp = checkLogicalPlan(1, 2, 9);
// XXX Physical plan has one less node in the local case
- PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 11);
Assert.assertTrue(executePlan(pp));
@@ -186,7 +181,7 @@
LogicalPlan lp = checkLogicalPlan(1, 3, 14);
- PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 17);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14);
Assert.assertTrue(executePlan(pp));
@@ -248,7 +243,7 @@
LogicalPlan lp = checkLogicalPlan(2, 3, 16);
// XXX the total number of ops is one less in the local case
- PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 21);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 19);
Assert.assertTrue(executePlan(pp));
@@ -459,7 +454,7 @@
myPig.registerQuery("store c into '/tmp/output5';");
LogicalPlan lp = checkLogicalPlan(1, 3, 12);
- PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 19);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 15);
myPig.executeBatch();
myPig.discardBatch();
@@ -536,7 +531,7 @@
private PhysicalPlan checkPhysicalPlan(LogicalPlan lp, int expectedRoots,
int expectedLeaves, int expectedSize) throws IOException {
- System.out.println("===== check physical plan =====");
+ System.out.println("===== check physical plan =====");
PhysicalPlan pp = myPig.getPigContext().getExecutionEngine().compile(
lp, null);
@@ -565,16 +560,38 @@
}
private void deleteOutputFiles() {
- try {
- FileLocalizer.delete("/tmp/output1", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output2", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output3", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output4", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output5", myPig.getPigContext());
+ String outputFiles[] = { "/tmp/output1",
+ "/tmp/output2",
+ "/tmp/output3",
+ "/tmp/output4",
+ "/tmp/output5"
+ };
+ try {
+ for( String outputFile : outputFiles ) {
+ if( isDirectory(outputFile) ) {
+ deleteDir( new File( outputFile ) );
+ } else {
+ FileLocalizer.delete(outputFile, myPig.getPigContext());
+ }
+ }
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
}
+
+ private void deleteDir( File file ) {
+ if( file.isDirectory() && file.listFiles().length != 0 ) {
+ for( File innerFile : file.listFiles() ) {
+ deleteDir( innerFile );
+ }
+ }
+ file.delete();
+ }
+
+ private boolean isDirectory( String filepath ) {
+ File file = new File( filepath );
+ return file.isDirectory();
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java Wed Nov 18
19:17:23 2009
@@ -46,8 +46,8 @@
public class TestPigContext extends TestCase {
private static final String TMP_DIR_PROP = "/tmp/hadoop-hadoop";
- private static final String FS_NAME = "machine:9000";
- private static final String JOB_TRACKER = "machine:9001";
+ private static final String FS_NAME = "file:///";
+ private static final String JOB_TRACKER = "local";
private File input;
private PigContext pigContext;
@@ -68,7 +68,7 @@
PigServer pigServer = new PigServer(pigContext);
registerAndStore(pigServer);
- check_asserts();
+ check_asserts(pigServer);
}
/**
@@ -79,7 +79,7 @@
PigServer pigServer = new PigServer(ExecType.LOCAL, getProperties());
registerAndStore(pigServer);
- check_asserts();
+ check_asserts(pigServer);
}
/**
@@ -91,7 +91,7 @@
PigServer pigServer = new PigServer(pigContext);
registerAndStore(pigServer);
- check_asserts();
+ check_asserts(pigServer);
}
@Test
@@ -220,7 +220,7 @@
}
private void registerAndStore(PigServer pigServer) throws IOException {
- pigServer.debugOn();
+ // pigServer.debugOn();
List<String> commands = getCommands();
for (final String command : commands) {
pigServer.registerQuery(command);
@@ -228,9 +228,9 @@
pigServer.store("counts", input.getAbsolutePath() + ".out");
}
- private void check_asserts() {
- assertEquals(JOB_TRACKER,
pigContext.getProperties().getProperty("mapred.job.tracker"));
- assertEquals(FS_NAME,
pigContext.getProperties().getProperty("fs.default.name"));
- assertEquals(TMP_DIR_PROP,
pigContext.getProperties().getProperty("hadoop.tmp.dir"));
+ private void check_asserts(PigServer pigServer) {
+ assertEquals(JOB_TRACKER,
pigServer.getPigContext().getProperties().getProperty("mapred.job.tracker"));
+ assertEquals(FS_NAME,
pigServer.getPigContext().getProperties().getProperty("fs.default.name"));
+ assertEquals(TMP_DIR_PROP,
pigServer.getPigContext().getProperties().getProperty("hadoop.tmp.dir"));
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java Wed Nov 18
19:17:23 2009
@@ -25,6 +25,7 @@
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.tools.pigstats.PigStats;
public class TestPigStats extends TestCase {
@@ -34,19 +35,38 @@
File outputFile = null;
try {
outputFile = File.createTempFile("JIAR_1027", ".out");
+ String filePath = outputFile.getAbsolutePath();
+ outputFile.delete();
PigServer pig = new PigServer(ExecType.LOCAL);
pig
.registerQuery("A = load
'test/org/apache/pig/test/data/passwd';");
- PigStats stats = pig.store("A", outputFile.getAbsolutePath())
+ PigStats stats = pig.store("A", filePath)
.getStatistics();
- assertEquals(outputFile.length(), stats.getBytesWritten());
+ File dataFile = new File( outputFile.getAbsoluteFile() +
File.separator + "part-00000" );
+ assertEquals(dataFile.length(), stats.getBytesWritten());
} catch (IOException e) {
+ e.printStackTrace();
+ System.err.println( e.getMessage() );
fail("IOException happened");
} finally {
if (outputFile != null) {
- outputFile.delete();
+ // Hadoop Local mode creates a directory
+ // Hence we need to delete a directory recursively
+ deleteDirectory(outputFile);
}
}
}
+
+ private void deleteDirectory( File dir ) {
+ File[] files = dir.listFiles();
+ for( File file : files ) {
+ if( file.isDirectory() ) {
+ deleteDirectory(file);
+ } else {
+ file.delete();
+ }
+ }
+ dir.delete();
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=881887&r1=881886&r2=881887&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Wed Nov 18
19:17:23 2009
@@ -17,16 +17,12 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.assertEquals;
-
import java.util.*;
import org.apache.pig.ExecType;
-import java.io.File;
import java.io.BufferedReader;
import java.io.FileReader;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -35,13 +31,11 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.FuncSpec;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTuple;
-import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.PigServer;
@@ -52,11 +46,8 @@
import
org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.TestHelper;
@@ -65,10 +56,7 @@
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
-import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.DataStorageException;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;