Author: pradeepkth
Date: Fri Mar 6 22:41:15 2009
New Revision: 751117
URL: http://svn.apache.org/viewvc?rev=751117&view=rev
Log:
PIG-627: multiquery support M2 (hagleitn via pradeepkth) -additional checking
of patch 'file_cmds-0305.patch'
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Fri Mar 6
22:41:15 2009
@@ -191,12 +191,7 @@
throw new IllegalStateException("setBatchOn() must be called
first.");
}
- try {
- currDAG.execute();
- } finally {
- log.info("Delete the current graph.");
- currDAG = graphs.pop();
- }
+ currDAG.execute();
}
/**
@@ -332,62 +327,13 @@
}
public LogicalPlan clonePlan(String alias) throws IOException {
- // There are two choices on how we clone the logical plan
- // 1 - we really clone each operator and connect up the cloned
operators
- // 2 - we cache away the script till the point we need to clone
- // and then simply re-parse the script.
- // The latter approach is used here
- // FIXME: There is one open issue with this now:
- // Consider the following script:
- // A = load 'file:/somefile';
- // B = filter A by $0 > 10;
- // store B into 'bla';
- // rm 'file:/somefile';
- // A = load 'file:/someotherfile'
- // when we try to clone - we try to reparse
- // from the beginning and currently the parser
- // checks for file existence of files in the load
- // in the case where the file is a local one -i.e. with file: prefix
- // This will be a known issue now and we will need to revisit later
-
- // parse each line of the cached script and the
- // final logical plan is the clone that we want
- LogicalPlan lp = null;
- int lineNumber = 1;
-
- // create data structures needed for parsing
- Graph graph = new Graph(true);
-
- for (Iterator<String> it = currDAG.getScriptCache().iterator();
it.hasNext(); lineNumber++) {
- if (isBatchOn()) {
- graph.registerQuery(it.next(), lineNumber);
- } else {
- lp = graph.parseQuery(it.next(), lineNumber);
- }
- }
-
- if(alias == null) {
- // a store prompted the execution - so return
- // the entire logical plan
- if (isBatchOn()) {
- lp = new LogicalPlan();
- for (LogicalPlan lpPart : graph.getStoreOpTable().values()) {
- lp.mergeSharedPlan(lpPart);
- }
- }
+ Graph graph = currDAG.clone();
- return lp;
- } else {
- // return the logical plan corresponding to the
- // alias supplied
- LogicalOperator op = graph.getAliasOp().get(alias);
- if(op == null) {
- int errCode = 1003;
- String msg = "Unable to find an operator for alias " + alias;
- throw new FrontendException(msg, errCode, PigException.INPUT);
- }
- return graph.getAliases().get(op);
+ if (graph == null) {
+ throw new AssertionError("Re-parsing has failed");
}
+
+ return graph.getPlan(alias);
}
public void registerQuery(String query) throws IOException {
@@ -435,20 +381,6 @@
* result
*/
public Iterator<Tuple> openIterator(String id) throws IOException {
- if (isBatchOn()) {
- log.info("Skip DUMP command in batch mode.");
- return new Iterator<Tuple>() {
- public boolean hasNext() {
- return false;
- }
- public Tuple next() {
- return null;
- }
- public void remove() {
- }
- };
- }
-
try {
LogicalOperator op = currDAG.getAliasOp().get(id);
if(null == op) {
@@ -913,14 +845,21 @@
private String jobName;
private boolean batchMode;
-
+
+ private int processedStores;
+
+ private int ignoreNumStores;
+
+ private LogicalPlan lp;
Graph(boolean batchMode) {
this.batchMode = batchMode;
+ this.processedStores = 0;
+ this.ignoreNumStores = 0;
this.jobName = "DefaultJobName";
+ this.lp = new LogicalPlan();
};
-
Map<LogicalOperator, LogicalPlan> getAliases() { return aliases; }
Map<OperatorKey, LogicalOperator> getOpTable() { return opTable; }
@@ -936,25 +875,42 @@
void execute() throws ExecException, FrontendException {
pigContext.getProperties().setProperty(PigContext.JOB_NAME,
PigContext.JOB_NAME_PREFIX + ":" + jobName);
PigServer.this.execute(null);
+ processedStores = storeOpTable.keySet().size();
}
void setJobName(String name) {
jobName = name;
}
+ LogicalPlan getPlan(String alias) throws IOException {
+ LogicalPlan plan = lp;
+
+ if (alias != null) {
+ LogicalOperator op = aliasOp.get(alias);
+ if(op == null) {
+ int errCode = 1003;
+ String msg = "Unable to find an operator for alias " +
alias;
+ throw new FrontendException(msg, errCode,
PigException.INPUT);
+ }
+ plan = aliases.get(op);
+ }
+ return plan;
+ }
+
void registerQuery(String query, int startLine) throws IOException {
- LogicalPlan lp = parseQuery(query, startLine);
+ LogicalPlan tmpLp = parseQuery(query, startLine);
// store away the query for use in cloning later
scriptCache.add(query);
- if (lp.getLeaves().size() == 1) {
- LogicalOperator op = lp.getSingleLeafPlanOutputOp();
-
+ if (tmpLp.getLeaves().size() == 1) {
+ LogicalOperator op = tmpLp.getSingleLeafPlanOutputOp();
+
// Check if we just processed a LOStore i.e. STORE
if (op instanceof LOStore) {
if (!batchMode) {
+ lp = tmpLp;
try {
execute();
} catch (Exception e) {
@@ -965,9 +921,13 @@
PigException.INPUT, e);
}
} else {
- storeOpTable.put(op, lp);
+ if (0 == ignoreNumStores) {
+ storeOpTable.put(op, tmpLp);
+ lp.mergeSharedPlan(tmpLp);
+ } else {
+ --ignoreNumStores;
+ }
}
-
}
}
}
@@ -991,5 +951,46 @@
throw new FrontendException(msg, errCode, PigException.INPUT,
false, null, e);
}
}
+
+ protected Graph clone() {
+ // There are two choices on how we clone the logical plan
+ // 1 - we really clone each operator and connect up the cloned
operators
+ // 2 - we cache away the script till the point we need to clone
+ // and then simply re-parse the script.
+ // The latter approach is used here
+ // FIXME: There is one open issue with this now:
+ // Consider the following script:
+ // A = load 'file:/somefile';
+ // B = filter A by $0 > 10;
+ // store B into 'bla';
+ // rm 'file:/somefile';
+ // A = load 'file:/someotherfile'
+ // when we try to clone - we try to reparse
+ // from the beginning and currently the parser
+ // checks for file existence of files in the load
+ // in the case where the file is a local one -i.e. with file:
prefix
+ // This will be a known issue now and we will need to revisit later
+
+ // parse each line of the cached script
+ int lineNumber = 1;
+
+ // create data structures needed for parsing
+ Graph graph = new Graph(true);
+ graph.ignoreNumStores = processedStores;
+ graph.processedStores = processedStores;
+
+ try {
+ for (Iterator<String> it = getScriptCache().iterator();
it.hasNext(); lineNumber++) {
+ if (isBatchOn()) {
+ graph.registerQuery(it.next(), lineNumber);
+ } else {
+ graph.lp = graph.parseQuery(it.next(), lineNumber);
+ }
+ }
+ } catch (IOException ioe) {
+ graph = null;
+ }
+ return graph;
+ }
}
}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Fri Mar 6 22:41:15 2009
@@ -72,11 +72,13 @@
List<Job> failedJobs = new LinkedList<Job>();
List<Job> succJobs = new LinkedList<Job>();
JobControl jc;
- int numMRJobs = mrp.size();
+ int totalMRJobs = mrp.size();
+ int numMRJobsCompl = 0;
+ int numMRJobsCurrent = 0;
double lastProg = -1;
while((jc = jcc.compile(mrp, grpName)) != null) {
- numMRJobs += jc.getWaitingJobs().size();
+ numMRJobsCurrent = jc.getWaitingJobs().size();
new Thread(jc).start();
@@ -84,7 +86,7 @@
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {}
- double prog = calculateProgress(jc, jobClient)/numMRJobs;
+ double prog = (numMRJobsCompl+calculateProgress(jc,
jobClient))/totalMRJobs;
if(prog>=(lastProg+0.01)){
int perCom = (int)(prog * 100);
if(perCom!=100)
@@ -92,6 +94,7 @@
}
lastProg = prog;
}
+ numMRJobsCompl += numMRJobsCurrent;
failedJobs.addAll(jc.getFailedJobs());
succJobs.addAll(jc.getSuccessfulJobs());
jcc.moveResults();
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
Fri Mar 6 22:41:15 2009
@@ -90,6 +90,23 @@
private void init() {
mDone = false;
+ mLoadOnly = false;
+ }
+
+ private void setBatchOn() {
+ mPigServer.setBatchOn();
+ }
+
+ private void executeBatch() throws IOException {
+ if (mPigServer.isBatchOn() && !mLoadOnly) {
+ mPigServer.executeBatch();
+ }
+ }
+
+ private void discardBatch() throws IOException {
+ if (mPigServer.isBatchOn()) {
+ mPigServer.discardBatch();
+ }
}
/**
@@ -106,7 +123,7 @@
}
if (!mInteractive) {
- mPigServer.setBatchOn();
+ setBatchOn();
}
try {
@@ -115,21 +132,17 @@
while(!mDone) {
parse();
}
- } catch(IOException e) {
- if (!mInteractive) {
- mPigServer.discardBatch();
- }
- throw e;
- } catch (ParseException e) {
- if (!mInteractive) {
- mPigServer.discardBatch();
- }
- throw e;
+
+ executeBatch();
+ }
+ finally {
+ discardBatch();
}
+ }
- if (!mInteractive) {
- mPigServer.executeBatch();
- }
+ public void setLoadOnly(boolean loadOnly)
+ {
+ mLoadOnly = loadOnly;
}
public void setParams(PigServer pigServer)
@@ -183,14 +196,14 @@
PrintStream out = System.out;
if (script != null) {
- mPigServer.setBatchOn();
+ setBatchOn();
try {
- loadScript(script, true, params, files);
+ loadScript(script, true, true, params, files);
} catch(IOException e) {
- mPigServer.discardBatch();
+ discardBatch();
throw e;
} catch (ParseException e) {
- mPigServer.discardBatch();
+ discardBatch();
throw e;
}
}
@@ -215,7 +228,7 @@
}
mPigServer.explain(alias, format, isVerbose, out, out, out);
if (script != null) {
- mPigServer.discardBatch();
+ discardBatch();
}
}
@@ -246,25 +259,26 @@
List<String> params, List<String> files)
throws IOException, ParseException {
+ if (script == null) {
+ executeBatch();
+ return;
+ }
+
if (batch) {
- mPigServer.setBatchOn();
+ setBatchOn();
try {
- loadScript(script, true, params, files);
- } catch (IOException e) {
- mPigServer.discardBatch();
- throw e;
- } catch (ParseException e) {
- mPigServer.discardBatch();
- throw e;
+ loadScript(script, true, mLoadOnly, params, files);
+ executeBatch();
+ } finally {
+ discardBatch();
}
- mPigServer.executeBatch();
} else {
- loadScript(script, false, params, files);
+ loadScript(script, false, mLoadOnly, params, files);
}
}
- private void loadScript(String script, boolean batch,
- List<String> params, List<String> files)
+ private void loadScript(String script, boolean batch, boolean loadOnly,
+ List<String> params, List<String> files)
throws IOException, ParseException {
Reader inputReader;
@@ -299,6 +313,7 @@
parser.setParams(mPigServer);
parser.setConsoleReader(reader);
parser.setInteractive(interactive);
+ parser.setLoadOnly(loadOnly);
parser.prompt();
while(!parser.isDone()) {
@@ -344,6 +359,8 @@
protected void processCat(String path) throws IOException
{
+ executeBatch();
+
try {
byte buffer[] = new byte[65536];
ElementDescriptor dfsPath = mDfs.asElement(path);
@@ -518,6 +535,8 @@
protected void processMove(String src, String dst) throws IOException
{
+ executeBatch();
+
try {
ElementDescriptor srcPath = mDfs.asElement(src);
ElementDescriptor dstPath = mDfs.asElement(dst);
@@ -535,6 +554,8 @@
protected void processCopy(String src, String dst) throws IOException
{
+ executeBatch();
+
try {
ElementDescriptor srcPath = mDfs.asElement(src);
ElementDescriptor dstPath = mDfs.asElement(dst);
@@ -548,6 +569,8 @@
protected void processCopyToLocal(String src, String dst) throws
IOException
{
+ executeBatch();
+
try {
ElementDescriptor srcPath = mDfs.asElement(src);
ElementDescriptor dstPath = mLfs.asElement(dst);
@@ -561,6 +584,8 @@
protected void processCopyFromLocal(String src, String dst) throws
IOException
{
+ executeBatch();
+
try {
ElementDescriptor srcPath = mLfs.asElement(src);
ElementDescriptor dstPath = mDfs.asElement(dst);
@@ -596,6 +621,8 @@
protected void processRemove(String path, String options ) throws
IOException
{
ElementDescriptor dfsPath = mDfs.asElement(path);
+
+ executeBatch();
if (!dfsPath.exists()) {
if (options == null || !options.equalsIgnoreCase("force")) {
@@ -614,5 +641,6 @@
private Properties mConf;
private JobClient mJobClient;
private boolean mDone;
+ private boolean mLoadOnly;
}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
Fri Mar 6 22:41:15 2009
@@ -530,7 +530,7 @@
void Script() throws IOException:
{
Token t;
- String script;
+ String script = null;
boolean batch = false;
ArrayList<String> params;
ArrayList<String> files;
@@ -556,8 +556,10 @@
t = GetPath()
{files.add(t.image);}
)*
- t = GetPath()
- {script = t.image;}
+ (
+ t = GetPath()
+ {script = t.image;}
+ )?
{processScript(script, batch, params, files);}
}
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java
(original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java Fri
Mar 6 22:41:15 2009
@@ -471,4 +471,50 @@
grunt.exec();
}
+
+ @Test
+ public void testPartialExecution() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd = "rmf bar; rmf baz; a = load
'file:test/org/apache/pig/test/data/passwd';"
+ +"store a into 'bar'; exec; a = load 'bar'; store a into 'baz';\n";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
+
+ @Test
+ public void testFileCmds() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd =
+ "rmf bar; rmf baz;"
+ +"a = load 'file:test/org/apache/pig/test/data/passwd';"
+ +"store a into 'bar';"
+ +"cp bar baz;"
+ +"rm bar; rm baz;"
+ +"store a into 'baz';"
+ +"store a into 'bar';"
+ +"rm baz; rm bar;"
+ +"store a into 'baz';"
+ +"mv baz bar;"
+ +"b = load 'bar';"
+ +"store b into 'baz';"
+ +"cat baz;"
+ +"rm baz;"
+ +"rm bar;\n";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
}
Modified:
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
(original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
Fri Mar 6 22:41:15 2009
@@ -94,6 +94,22 @@
}
@Test
+ public void testEmptyExecute() {
+ System.out.println("=== test empty execute ===");
+
+ try {
+ myPig.setBatchOn();
+ myPig.executeBatch();
+ myPig.executeBatch();
+ myPig.discardBatch();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
public void testMultiQueryWithTwoStores2() {
System.out.println("===== test multi-query with 2 stores (2) =====");
@@ -119,6 +135,34 @@
}
@Test
+ public void testMultiQueryWithTwoStores2Execs() {
+
+ System.out.println("===== test multi-query with 2 stores (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load
'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
+ myPig.registerQuery("b = filter a by uid > 5;");
+ myPig.executeBatch();
+ myPig.registerQuery("store b into '/tmp/output1';");
+ myPig.executeBatch();
+ myPig.registerQuery("c = group b by gid;");
+ myPig.registerQuery("store c into '/tmp/output2';");
+
+ myPig.executeBatch();
+ myPig.discardBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ deleteOutputFiles();
+ }
+ }
+
+ @Test
public void testMultiQueryWithThreeStores() {
System.out.println("===== test multi-query with 3 stores =====");
@@ -169,6 +213,7 @@
myPig.registerQuery("store d into '/tmp/output3';");
myPig.executeBatch();
+ myPig.discardBatch();
} catch (Exception e) {
e.printStackTrace();
@@ -233,6 +278,7 @@
myPig.registerQuery("store e into '/tmp/output3';");
myPig.executeBatch();
+ myPig.discardBatch();
} catch (Exception e) {
e.printStackTrace();
@@ -283,6 +329,7 @@
myPig.registerQuery("group b by gid;");
myPig.executeBatch();
+ myPig.discardBatch();
} catch (Exception e) {
e.printStackTrace();
Modified:
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
(original)
+++
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
Fri Mar 6 22:41:15 2009
@@ -71,7 +71,7 @@
myPig.registerQuery("a = load
'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("store b into '/tmp/output1';");
myPig.registerQuery("c = group b by gid;");
myPig.registerQuery("store c into '/tmp/output2';");
@@ -81,9 +81,6 @@
// XXX Physical plan has one less node in the local case
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 10);
- // XXX MR plan doesn't seem to work in the local case
- checkMRPlan(pp, 2, 2, 2);
-
Assert.assertTrue(executePlan(pp));
} catch (Exception e) {
@@ -95,6 +92,22 @@
}
@Test
+ public void testEmptyExecute() {
+ System.out.println("=== test empty execute ===");
+
+ try {
+ myPig.setBatchOn();
+ myPig.executeBatch();
+ myPig.executeBatch();
+ myPig.discardBatch();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
public void testMultiQueryWithTwoStores2() {
System.out.println("===== test multi-query with 2 stores (2) =====");
@@ -104,12 +117,40 @@
myPig.registerQuery("a = load
'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
+ myPig.registerQuery("store b into '/tmp/output1';");
+ myPig.registerQuery("c = group b by gid;");
+ myPig.registerQuery("store c into '/tmp/output2';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ deleteOutputFiles();
+ }
+ }
+
+ @Test
+ public void testMultiQueryWithTwoStores2Execs() {
+
+ System.out.println("===== test multi-query with 2 stores (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load
'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
+ myPig.registerQuery("b = filter a by uid > 5;");
+ myPig.executeBatch();
myPig.registerQuery("store b into '/tmp/output1';");
+ myPig.executeBatch();
myPig.registerQuery("c = group b by gid;");
myPig.registerQuery("store c into '/tmp/output2';");
myPig.executeBatch();
+ myPig.discardBatch();
} catch (Exception e) {
e.printStackTrace();
@@ -129,20 +170,17 @@
myPig.registerQuery("a = load
'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("store b into '/tmp/output1';");
- myPig.registerQuery("c = filter b by uid > 1000;");
+ myPig.registerQuery("c = filter b by uid > 10;");
myPig.registerQuery("store c into '/tmp/output2';");
- myPig.registerQuery("d = filter c by uid > 1500;");
+ myPig.registerQuery("d = filter c by uid > 15;");
myPig.registerQuery("store d into '/tmp/output3';");
LogicalPlan lp = checkLogicalPlan(1, 3, 14);
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14);
- // XXX MR plan doesn't seem to work in the local case
- checkMRPlan(pp, 3, 3, 3);
-
Assert.assertTrue(executePlan(pp));
} catch (Exception e) {
@@ -163,14 +201,15 @@
myPig.registerQuery("a = load
'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("store b into '/tmp/output1';");
- myPig.registerQuery("c = filter b by uid > 1000;");
+ myPig.registerQuery("c = filter b by uid > 10;");
myPig.registerQuery("store c into '/tmp/output2';");
- myPig.registerQuery("d = filter c by uid > 1500;");
+ myPig.registerQuery("d = filter c by uid > 15;");
myPig.registerQuery("store d into '/tmp/output3';");
myPig.executeBatch();
+ myPig.discardBatch();
} catch (Exception e) {
e.printStackTrace();
@@ -192,8 +231,8 @@
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
myPig.registerQuery("b = load
'file:test/org/apache/pig/test/data/passwd2' " +
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("c = filter a by uid > 500;");
- myPig.registerQuery("d = filter b by uid > 1000;");
+ myPig.registerQuery("c = filter a by uid > 5;");
+ myPig.registerQuery("d = filter b by uid > 10;");
myPig.registerQuery("store c into '/tmp/output1';");
myPig.registerQuery("store d into '/tmp/output2';");
myPig.registerQuery("e = cogroup c by uid, d by uid;");
@@ -204,9 +243,6 @@
// XXX the total number of ops is one less in the local case
PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 18);
- // XXX MR plan doesn't seem to work in the local case
- checkMRPlan(pp, 4, 4, 4);
-
Assert.assertTrue(executePlan(pp));
} catch (Exception e) {
@@ -229,14 +265,15 @@
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
myPig.registerQuery("b = load
'file:test/org/apache/pig/test/data/passwd2' " +
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("c = filter a by uid > 500;");
- myPig.registerQuery("d = filter b by uid > 1000;");
+ myPig.registerQuery("c = filter a by uid > 5;");
+ myPig.registerQuery("d = filter b by uid > 10;");
myPig.registerQuery("store c into '/tmp/output1';");
myPig.registerQuery("store d into '/tmp/output2';");
myPig.registerQuery("e = cogroup c by uid, d by uid;");
myPig.registerQuery("store e into '/tmp/output3';");
myPig.executeBatch();
+ myPig.discardBatch();
} catch (Exception e) {
e.printStackTrace();
@@ -256,7 +293,7 @@
myPig.registerQuery("a = load
'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("group b by gid;");
LogicalPlan lp = checkLogicalPlan(0, 0, 0);
@@ -264,8 +301,6 @@
// XXX Physical plan has one less node in the local case
PhysicalPlan pp = checkPhysicalPlan(lp, 0, 0, 0);
- //checkMRPlan(pp, 0, 0, 0);
-
//Assert.assertTrue(executePlan(pp));
} catch (Exception e) {
@@ -284,10 +319,11 @@
myPig.registerQuery("a = load
'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("group b by gid;");
myPig.executeBatch();
+ myPig.discardBatch();
} catch (Exception e) {
e.printStackTrace();
@@ -304,7 +340,7 @@
try {
String script = "a = load
'file:test/org/apache/pig/test/data/passwd' "
+ "using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);"
- + "b = filter a by uid > 500;"
+ + "b = filter a by uid > 5;"
+ "explain b;"
+ "store b into '/tmp/output1';\n";
@@ -329,7 +365,7 @@
try {
String script = "a = load
'file:test/org/apache/pig/test/data/passwd' "
+ "using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);"
- + "b = filter a by uid > 500;"
+ + "b = filter a by uid > 5;"
+ "dump b;"
+ "store b into '/tmp/output1';\n";
@@ -354,7 +390,7 @@
try {
String script = "a = load
'file:test/org/apache/pig/test/data/passwd' "
+ "using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);"
- + "b = filter a by uid > 500;"
+ + "b = filter a by uid > 5;"
+ "describe b;"
+ "store b into '/tmp/output1';\n";
@@ -379,7 +415,7 @@
try {
String script = "a = load
'file:test/org/apache/pig/test/data/passwd' "
+ "using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);"
- + "b = filter a by uid > 500;"
+ + "b = filter a by uid > 5;"
+ "illustrate b;"
+ "store b into '/tmp/output1';\n";
@@ -467,25 +503,6 @@
return pp;
}
- private MROperPlan checkMRPlan(PhysicalPlan pp, int expectedRoots,
- int expectedLeaves, int expectedSize) throws IOException {
-
- System.out.println("===== check map-reduce plan =====");
-
- ExecTools.checkLeafIsStore(pp, myPig.getPigContext());
-
- MRCompiler mrcomp = new MRCompiler(pp, myPig.getPigContext());
- MROperPlan mrp = mrcomp.compile();
-
- Assert.assertEquals(expectedRoots, mrp.getRoots().size());
- Assert.assertEquals(expectedLeaves, mrp.getLeaves().size());
- Assert.assertEquals(expectedSize, mrp.size());
-
- showPlanOperators(mrp);
-
- return mrp;
- }
-
private boolean executePlan(PhysicalPlan pp) throws IOException {
FileLocalizer.clearDeleteOnFail();
ExecJob job = myPig.getPigContext().getExecutionEngine().execute(pp,
"execute");