Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java?rev=897283&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java (added) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java Fri Jan 8 18:17:07 2010 @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.pen.physicalOperators; + +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.util.ExampleTuple; + +public class POSplitOutput extends PhysicalOperator { + + /** + * POSplitOutput reads from POSplit using an iterator + */ + private static final long serialVersionUID = 1L; + + PhysicalOperator compOp; + PhysicalPlan compPlan; + transient Iterator<Tuple> it; + + public POSplitOutput(OperatorKey k, int rp, List<PhysicalOperator> inp) { + super(k, rp, inp); + // TODO Auto-generated constructor stub + } + + public POSplitOutput(OperatorKey k, int rp) { + super(k, rp); + // TODO Auto-generated constructor stub + } + + public POSplitOutput(OperatorKey k, List<PhysicalOperator> inp) { + super(k, inp); + // TODO Auto-generated constructor stub + } + + public POSplitOutput(OperatorKey k) { + super(k); + // TODO Auto-generated constructor stub + } + + @Override + public void visit(PhyPlanVisitor v) throws VisitorException { + // TODO Auto-generated method stub + + } + + @SuppressWarnings("unchecked") + public Result getNext(Tuple t) throws ExecException { + if(it == null) { + PhysicalOperator op = getInputs().get(0); + Result res = getInputs().get(0).getNext(t); + if(res.returnStatus == POStatus.STATUS_OK) + it = (Iterator<Tuple>) res.result; + } + Result res = null; + Result inp = new Result(); + while(true) { + if(it.hasNext()) + inp.result = it.next(); + else { + inp.returnStatus = POStatus.STATUS_EOP; + return inp; + } + inp.returnStatus = POStatus.STATUS_OK; + + compPlan.attachInput((Tuple) inp.result); + + res = compOp.getNext(dummyBool); + if (res.returnStatus != POStatus.STATUS_OK + && res.returnStatus != POStatus.STATUS_NULL) + return res; + + if (res.result != null && (Boolean) res.result == true) { + if(lineageTracer != null) { + ExampleTuple tIn = (ExampleTuple) inp.result; + lineageTracer.insert(tIn); + lineageTracer.union(tIn, tIn); + } + return inp; + } + } + + } + + @Override + public String name() { + // TODO Auto-generated method stub + return "POSplitOutput " + mKey.toString(); + } + + @Override + public boolean supportsMultipleInputs() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean supportsMultipleOutputs() { + // TODO Auto-generated method stub + return false; + } + + public void setPlan(PhysicalPlan compPlan) { + this.compPlan = compPlan; + this.compOp = compPlan.getLeaves().get(0); + } + +}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java?rev=897283&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java (added) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java Fri Jan 8 18:17:07 2010 @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.pen.physicalOperators; + +import java.util.Properties; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.streaming.ExecutableManager; +import org.apache.pig.impl.streaming.StreamingCommand; + +public class POStreamLocal extends POStream { + + /** + * + */ + private static final long serialVersionUID = 2L; + + public POStreamLocal(OperatorKey k, ExecutableManager executableManager, + StreamingCommand command, Properties properties) { + super(k, executableManager, command, properties); + // TODO Auto-generated constructor stub + } + + + /** + * This is different from the Map-Reduce implementation of the POStream since there is no + * push model here. POStatus_EOP signals the end of input and can be used to decide when + * to stop the stdin to the process + */ + @Override + public Result getNext(Tuple t) throws ExecException { + // The POStream Operator works with ExecutableManager to + // send input to the streaming binary and to get output + // from it. To achieve a tuple oriented behavior, two queues + // are used - one for output from the binary and one for + // input to the binary. In each getNext() call: + // 1) If there is no more output expected from the binary, an EOP is + // sent to successor + // 2) If there is any output from the binary in the queue, it is passed + // down to the successor + // 3) if neither of these two are true and if it is possible to + // send input to the binary, then the next tuple from the + // predecessor is got and passed to the binary + try { + // if we are being called AFTER all output from the streaming + // binary has already been sent to us then just return EOP + // The "allOutputFromBinaryProcessed" flag is set when we see + // an EOS (End of Stream output) from streaming binary + if(allOutputFromBinaryProcessed) { + return new Result(POStatus.STATUS_EOP, null); + } + + // if we are here AFTER all map() calls have been completed + // AND AFTER we process all possible input to be sent to the + // streaming binary, then all we want to do is read output from + // the streaming binary + if(allInputFromPredecessorConsumed) { + Result r = binaryOutputQueue.take(); + if(r.returnStatus == POStatus.STATUS_EOS) { + // If we received EOS, it means all output + // from the streaming binary has been sent to us + // So we can send an EOP to the successor in + // the pipeline. Also since we are being called + // after all input from predecessor has been processed + // it means we got here from a call from close() in + // map or reduce. So once we send this EOP down, + // getNext() in POStream should never be called. So + // we don't need to set any flag noting we saw all output + // from binary + r.returnStatus = POStatus.STATUS_EOP; + } + return(r); + } + + // if we are here, we haven't consumed all input to be sent + // to the streaming binary - check if we are being called + // from close() on the map or reduce + //if(this.parentPlan.endOfAllInput) { + Result r = getNextHelper(t); + if(r.returnStatus == POStatus.STATUS_EOP) { + // we have now seen *ALL* possible input + // check if we ever had any real input + // in the course of the map/reduce - if we did + // then "initialized" will be true. If not, just + // send EOP down. + if(initialized) { + // signal End of ALL input to the Executable Manager's + // Input handler thread + binaryInputQueue.put(r); + // note this state for future calls + allInputFromPredecessorConsumed = true; + // look for output from binary + r = binaryOutputQueue.take(); + if(r.returnStatus == POStatus.STATUS_EOS) { + // If we received EOS, it means all output + // from the streaming binary has been sent to us + // So we can send an EOP to the successor in + // the pipeline. Also since we are being called + // after all input from predecessor has been processed + // it means we got here from a call from close() in + // map or reduce. So once we send this EOP down, + // getNext() in POStream should never be called. So + // we don't need to set any flag noting we saw all output + // from binary + r.returnStatus = POStatus.STATUS_EOP; + } + } + + } else if(r.returnStatus == POStatus.STATUS_EOS) { + // If we received EOS, it means all output + // from the streaming binary has been sent to us + // So we can send an EOP to the successor in + // the pipeline. Also we are being called + // from close() in map or reduce (this is so because + // only then this.parentPlan.endOfAllInput is true). + // So once we send this EOP down, getNext() in POStream + // should never be called. So we don't need to set any + // flag noting we saw all output from binary + r.returnStatus = POStatus.STATUS_EOP; + } + return r; +// } else { +// // we are not being called from close() - so +// // we must be called from either map() or reduce() +// // get the next Result from helper +// Result r = getNextHelper(t); +// if(r.returnStatus == POStatus.STATUS_EOS) { +// // If we received EOS, it means all output +// // from the streaming binary has been sent to us +// // So we can send an EOP to the successor in +// // the pipeline and also note this condition +// // for future calls +// r.returnStatus = POStatus.STATUS_EOP; +// allOutputFromBinaryProcessed = true; +// } +// return r; +// } + + } catch(Exception e) { + throw new ExecException("Error while trying to get next result in POStream", e); + } + + + } + + + +} Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java Fri Jan 8 18:17:07 2010 @@ -39,11 +39,13 @@ import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.pig.ExecType; +import org.apache.pig.PigCounters; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.data.BagFactory; import org.apache.pig.impl.util.ObjectSerializer; public class PigStats { @@ -186,6 +188,9 @@ jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString()); jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString()); jobStats.put("PIG_STATS_BYTES_WRITTEN", (Long.valueOf(hdfsgroup.getCounterForName("HDFS_BYTES_WRITTEN").getCounter())).toString()); + jobStats.put("PIG_STATS_SMM_SPILL_COUNT", (Long.valueOf(counters.findCounter(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT).getCounter())).toString() ); + jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", (Long.valueOf(counters.findCounter(PigCounters.PROACTIVE_SPILL_COUNT).getCounter())).toString() ); + } else { @@ -194,6 +199,8 @@ jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", "-1"); jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", "-1"); jobStats.put("PIG_STATS_BYTES_WRITTEN", "-1"); + jobStats.put("PIG_STATS_SMM_SPILL_COUNT", "-1"); + jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", "-1"); } } catch (IOException e) { @@ -294,6 +301,21 @@ } + public long getSMMSpillCount() { + long spillCount = 0; + for (String jid : rootJobIDs) { + Map<String, String> jobStats = stats.get(jid); + if (jobStats == null) continue; + if (Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"))==-1L) + { + spillCount = -1L; + break; + } + spillCount += Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT")); + } + return spillCount; + } + private long getLocalBytesWritten() { for(PhysicalOperator op : php.getLeaves()) return Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_BYTES_WRITTEN")); Modified: hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml (original) +++ hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml Fri Jan 8 18:17:07 2010 @@ -324,5 +324,9 @@ <Field name = "res" /> <Bug pattern="MF_CLASS_MASKS_FIELD" /> </Match> - + <Match> + <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher" /> + <Method name = "launchPig" /> + <Bug pattern="DE_MIGHT_IGNORE" /> + </Match> </FindBugsFilter> Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java Fri Jan 8 18:17:07 2010 @@ -28,8 +28,8 @@ import org.apache.pig.ExecType; import org.apache.pig.PigServer; +import org.apache.pig.test.utils.LocalSeekableInputStream; import org.apache.pig.backend.datastorage.ElementDescriptor; -import org.apache.pig.backend.local.datastorage.LocalSeekableInputStream; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.tools.bzip2r.CBZip2InputStream; Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java Fri Jan 8 18:17:07 2010 @@ -424,4 +424,39 @@ assertTrue(iter.hasNext()==false); } + + // See PIG-761 + @Test + public void testLimitPOPackageAnnotator() throws Exception{ + File tmpFile1 = File.createTempFile("test1", "txt"); + PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1)); + ps1.println("1\t2\t3"); + ps1.println("2\t5\t2"); + ps1.close(); + + File tmpFile2 = File.createTempFile("test2", "txt"); + PrintStream ps2 = new PrintStream(new FileOutputStream(tmpFile2)); + ps2.println("1\t1"); + ps2.println("2\t2"); + ps2.close(); + + pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = LOAD '" + Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);"); + pigServer.registerQuery("C = LIMIT B 100;"); + pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})")); + + assertFalse(iter.hasNext()); + } + } Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java Fri Jan 8 18:17:07 2010 @@ -74,7 +74,7 @@ public void testGetAbsolutePath3() throws IOException { // test case: remote hdfs path String absPath = "hdfs://myhost.mydomain:37765/data/passwd"; - Assert.assertEquals(curHdfsRoot + "/data/passwd", + Assert.assertEquals(absPath, LoadFunc.getAbsolutePath(absPath, curHdfsDir)); } Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java Fri Jan 8 18:17:07 2010 @@ -37,7 +37,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor; +import org.apache.pig.pen.LocalLogToPhyTranslationVisitor; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java Fri Jan 8 18:17:07 2010 @@ -111,6 +111,63 @@ public void tearDown() throws Exception { myPig = null; } + + public void testMultiQueryJiraPig1171() { + + // test case: Problems with some top N queries + + String INPUT_FILE = "abc"; + + try { + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("1\tapple\t3"); + w.println("2\torange\t4"); + w.println("3\tpersimmon\t5"); + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + myPig.setBatchOn(); + + myPig.registerQuery("A = load '" + INPUT_FILE + + "' as (a:long, b, c);"); + myPig.registerQuery("A1 = Order A by a desc;"); + myPig.registerQuery("A2 = limit A1 1;"); + myPig.registerQuery("B = load '" + INPUT_FILE + + "' as (a:long, b, c);"); + myPig.registerQuery("B1 = Order B by a desc;"); + myPig.registerQuery("B2 = limit B1 1;"); + + myPig.registerQuery("C = cross A2, B2;"); + + Iterator<Tuple> iter = myPig.openIterator("C"); + + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "(3L,'persimmon',5,3L,'persimmon',5)" + }); + + int counter = 0; + while (iter.hasNext()) { + assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); + } + + assertEquals(expectedResults.size(), counter); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + new File(INPUT_FILE).delete(); + try { + Util.deleteFile(cluster, INPUT_FILE); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } public void testMultiQueryJiraPig1157() { Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java Fri Jan 8 18:17:07 2010 @@ -32,7 +32,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead; -import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup; +import org.apache.pig.pen.physicalOperators.POCogroup; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java Fri Jan 8 18:17:07 2010 @@ -28,7 +28,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead; -import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCross; +import org.apache.pig.pen.physicalOperators.POCross; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java Fri Jan 8 18:17:07 2010 @@ -1656,4 +1656,115 @@ "No map keys pruned for C"})); } + // See PIG-1165 + @Test + public void testOrderbyWrongSignature() throws Exception { + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);"); + pigServer.registerQuery("C = order A by a1;"); + pigServer.registerQuery("D = join C by a1, B by b0;"); + pigServer.registerQuery("E = foreach D generate a1, b0, b1;"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + assertTrue(t.size()==3); + assertTrue(t.toString().equals("(2,2,2)")); + + assertFalse(iter.hasNext()); + + assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $2", + "No map keys pruned for A", "No column pruned for B", "No map keys pruned for B"})); + } + + // See PIG-1146 + @Test + public void testUnionMixedPruning() throws Exception { + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b2);"); + pigServer.registerQuery("C = foreach B generate b0, 'hello', b2;"); + pigServer.registerQuery("D = union A, C;"); + pigServer.registerQuery("E = foreach D generate $0, $2;"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + Collection<String> results = new HashSet<String>(); + results.add("(1,3)"); + results.add("(2,2)"); + results.add("(1,1)"); + results.add("(2,2)"); + + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + assertTrue(t.size()==2); + assertTrue(results.contains(t.toString())); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.size()==2); + assertTrue(results.contains(t.toString())); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.size()==2); + assertTrue(results.contains(t.toString())); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.size()==2); + assertTrue(results.contains(t.toString())); + + assertFalse(iter.hasNext()); + + assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1", + "No map keys pruned for A", "No column pruned for B", + "No map keys pruned for B"})); + } + + // See PIG-1176 + @Test + public void testUnionMixedSchemaPruning() throws Exception { + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = foreach A generate a0;;"); + pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "';"); + pigServer.registerQuery("D = foreach C generate $0;"); + pigServer.registerQuery("E = union B, D;"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + Collection<String> results = new HashSet<String>(); + results.add("(1)"); + results.add("(2)"); + results.add("(1)"); + results.add("(2)"); + + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + assertTrue(t.size()==1); + assertTrue(results.contains(t.toString())); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.size()==1); + assertTrue(results.contains(t.toString())); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.size()==1); + assertTrue(results.contains(t.toString())); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.size()==1); + assertTrue(results.contains(t.toString())); + + assertFalse(iter.hasNext()); + + assertTrue(emptyLogFileMessage()); + } } Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java Fri Jan 8 18:17:07 2010 @@ -977,5 +977,27 @@ } + // See PIG-1172 + @Test + public void testForeachJoinRequiredField() throws Exception { + planTester.buildPlan("A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});"); + planTester.buildPlan("B = FOREACH A generate flatten($0);"); + planTester.buildPlan("C = load '3.txt' AS (c0, c1);"); + planTester.buildPlan("D = JOIN B by a1, C by c1;"); + LogicalPlan lp = planTester.buildPlan("E = limit D 10;"); + + planTester.setPlan(lp); + planTester.setProjectionMap(lp); + planTester.rebuildSchema(lp); + + PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp); + + LOLoad loada = (LOLoad) lp.getRoots().get(0); + + assertTrue(!pushDownForeach.check(lp.getSuccessors(loada))); + assertTrue(pushDownForeach.getSwap() == false); + assertTrue(pushDownForeach.getInsertBetween() == false); + } + } Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=897283&r1=897282&r2=897283&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java Fri Jan 8 18:17:07 2010 @@ -17,12 +17,12 @@ */ package org.apache.pig.test; +import java.io.File; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.Set; @@ -44,6 +44,12 @@ import org.apache.pig.data.DefaultTuple; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; +import org.apache.pig.pen.physicalOperators.POCounter; +import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.impl.logicalLayer.LOStore; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; @@ -58,22 +64,24 @@ import org.junit.Test; public class TestStore extends junit.framework.TestCase { - + POStore st; DataBag inpDB; static MiniCluster cluster = MiniCluster.buildCluster(); - PigServer pig; PigContext pc; + POProject proj; + PigServer pig; + POCounter pcount; + String inputFileName; String outputFileName; - + @Override @Before public void setUp() throws Exception { pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pc = pig.getPigContext(); inputFileName = "/tmp/TestStore-" + new Random().nextLong() + ".txt"; - outputFileName = "/tmp/TestStore-output-" + new Random().nextLong() + - ".txt"; + outputFileName = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt"; } @Override @@ -87,14 +95,26 @@ private void storeAndCopyLocally(DataBag inpDB) throws Exception { setUpInputFileOnCluster(inpDB); String script = "a = load '" + inputFileName + "'; " + - "store a into '" + outputFileName + "' using PigStorage(':');" + - "fs -ls /tmp"; + "store a into '" + outputFileName + "' using PigStorage(':');" + + "fs -ls /tmp"; pig.setBatchOn(); Util.registerMultiLineQuery(pig, script); pig.executeBatch(); Util.copyFromClusterToLocal(cluster, outputFileName + "/part-m-00000", outputFileName); } + private PigStats store() throws Exception { + PhysicalPlan pp = new PhysicalPlan(); + pp.add(proj); + pp.add(st); + pp.add(pcount); + //pp.connect(proj, st); + pp.connect(proj, pcount); + pp.connect(pcount, st); + pc.setExecType(ExecType.LOCAL); + return new MapReduceLauncher().launchPig(pp, "TestStore", pc); + } + @Test public void testStore() throws Exception { inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100); @@ -159,7 +179,6 @@ inpDB = GenRandomData.genRandFullTupTextDataBag(new Random(), 10, 100); storeAndCopyLocally(inpDB); PigStorage ps = new PigStorage(":"); - int size = 0; BufferedReader br = new BufferedReader(new FileReader(outputFileName)); for(String line=br.readLine();line!=null;line=br.readLine()){ Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java?rev=897283&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java (added) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java Fri Jan 8 18:17:07 2010 @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test.utils; + +import java.io.RandomAccessFile; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.pig.backend.datastorage.*; + +public class LocalSeekableInputStream extends SeekableInputStream { + + protected RandomAccessFile file; + protected long curMark; + + public LocalSeekableInputStream(File file) throws FileNotFoundException { + this.file = new RandomAccessFile(file, "r"); + this.curMark = 0; + } + + @Override + public void seek(long offset, FLAGS whence) throws IOException { + long targetPos; + + switch (whence) { + case SEEK_SET: { + targetPos = offset; + break; + } + case SEEK_CUR: { + targetPos = this.file.getFilePointer() + offset; + break; + } + case SEEK_END: { + targetPos = this.file.length() + offset; + break; + } + default: { + throw new IOException("Invalid seek option: " + whence); + } + } + + this.file.seek(targetPos); + } + + @Override + public long tell() throws IOException { + return this.file.getFilePointer(); + } + + @Override + public int read() throws IOException { + return this.file.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return this.file.read(b); + } + + @Override + public int read(byte[] b, int off, int len ) throws IOException { + return this.file.read(b, off, len); + } + + @Override + public int available() throws IOException { + return (int)( this.file.length() - this.file.getFilePointer() ); + } + + @Override + public long skip(long n) throws IOException { + long skipped = 0; + + if (n > 0) { + skipped = this.file.length() - tell(); + + seek(n, FLAGS.SEEK_CUR); + } + + return skipped; + } + + @Override + public void close() throws IOException { + this.file.close(); + } + + @Override + public void mark(int readlimit) { + try { + this.curMark = tell(); + } + catch (IOException e) { + ; + } + } + + @Override + public void reset() throws IOException { + seek(this.curMark, FLAGS.SEEK_SET); + } + + @Override + public boolean markSupported() { + return true; + } +}
