Author: olga Date: Tue Jun 16 20:27:38 2009 New Revision: 785378 URL: http://svn.apache.org/viewvc?rev=785378&view=rev Log: PIG-849: Local engine loses records in splits (hagleitn via olgan)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=785378&r1=785377&r2=785378&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Jun 16 20:27:38 2009 @@ -28,6 +28,11 @@ BUG FIXES + PIG-852: pig -version or pig -help returns exit code of 1 (milindb via + olgan) + + PIG-849: Local engine loses records in splits (hagleitn via olgan) + Release 0.3.0 - Unreleased INCOMPATIBLE CHANGES @@ -692,5 +697,3 @@ PIG-284: target for building source jar (oae via olgan) - PIG-852: pig -version or pig -help returns exit code of 1 (milindb via - olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=785378&r1=785377&r2=785378&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java Tue Jun 16 20:27:38 2009 @@ -68,7 +68,10 @@ if(input.returnStatus == POStatus.STATUS_ERR) { throw new ExecException("Error accumulating output at local Split operator"); } - data.add((Tuple) input.result); + + if (input.returnStatus != POStatus.STATUS_NULL) { + data.add((Tuple) input.result); + } } processingDone = true; } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java?rev=785378&r1=785377&r2=785378&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java Tue Jun 16 20:27:38 2009 @@ -67,6 +67,8 @@ @Test public void testSplit() throws IOException, VisitorException, ExecException { + init(); + pigContext.connect(); File datFile = File.createTempFile("tempA", ".dat"); @@ -134,6 +136,76 @@ } } + @Test + public void testSplitNulls() throws IOException, VisitorException, ExecException { + init(); + + pigContext.connect(); + File datFile = File.createTempFile("tempN1", ".dat"); + String path1 = Util.encodeEscape(datFile.getAbsolutePath()); + + FileOutputStream dat = new FileOutputStream(datFile); + String s1 = "1\n2\n3\n42\n4\n5\n"; + dat.write(s1.getBytes()); + dat.close(); + + String s2 = "1\n2\n43\n3\n4\n5\n"; + + datFile = File.createTempFile("tempN2", ".dat"); + String path2 = Util.encodeEscape(datFile.getAbsolutePath()); + + dat = new FileOutputStream(datFile); + dat.write(s2.getBytes()); + dat.close(); + + String query = "a = load '"+path1+"'; b = load '"+path2+"'; "+ + "c = cogroup a by $0, b by $0; d = foreach c generate $0, flatten($1), flatten($2); "+ + "split d into e if 1==1, f if 1==1;"; + + LogicalPlan plan = buildPlan(query); + PhysicalPlan pp = buildPhysicalPlan(plan); + + DataBag[] bag = new DataBag[pp.getLeaves().size()]; + + for (int i = 0; i < bag.length; i++) { + bag[i] = BagFactory.getInstance().newDefaultBag(); + } + + for (int i = 0; i < pp.getLeaves().size(); i++) { + System.out.println("Leaves: "+i); + Tuple t = null; + for (Result res = pp.getLeaves().get(i).getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = pp + .getLeaves().get(i).getNext(t)) { + if (res.returnStatus == POStatus.STATUS_OK) + bag[i].add((Tuple) res.result); + System.out.println("Split: "+res.result); + } + } + + Map<DataByteArray, Integer> seen = new HashMap<DataByteArray, Integer>(); + seen.put(new DataByteArray("1".getBytes()), new Integer(0)); + seen.put(new DataByteArray("2".getBytes()), new Integer(0)); + seen.put(new DataByteArray("3".getBytes()), new Integer(0)); + seen.put(new DataByteArray("4".getBytes()), new Integer(0)); + seen.put(new DataByteArray("5".getBytes()), new Integer(0)); + + for (int i = 0; i < bag.length; i++) { + DataByteArray value = null; + Iterator<Tuple> it = bag[i].iterator(); + while (it.hasNext()) { + Tuple t = it.next(); + System.out.println("Value: "+t); + value = (DataByteArray) t.get(0); + Integer count = seen.get(value); + seen.put(value, ++count); + } + } + + for (Integer j: seen.values()) { + assertEquals(j, new Integer(2)); + } + } + public PhysicalPlan buildPhysicalPlan(LogicalPlan lp) throws VisitorException { LocalLogToPhyTranslationVisitor visitor = new LocalLogToPhyTranslationVisitor( @@ -153,8 +225,18 @@ LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext); // try { - LogicalPlan lp = builder.parse("Test-Plan-Builder", query, aliases, - logicalOpTable, aliasOp, fileNameMap); + String[] qs = query.split(";"); + LogicalPlan lp = null; + for (String q: qs) { + q = q.trim(); + if (q.equals("")) + continue; + q += ";"; + System.out.println(q); + lp = builder.parse("Test-Plan-Builder", q, aliases, + logicalOpTable, aliasOp, fileNameMap); + } + List<LogicalOperator> roots = lp.getRoots(); if (roots.size() > 0) { @@ -238,8 +320,15 @@ } - Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>(); - Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>(); - Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>(); - Map<String, String> fileNameMap = new HashMap<String, String>(); + private void init() { + aliases = new HashMap<LogicalOperator, LogicalPlan>(); + logicalOpTable = new HashMap<OperatorKey, LogicalOperator>(); + aliasOp = new HashMap<String, LogicalOperator>(); + fileNameMap = new HashMap<String, String>(); + } + + Map<LogicalOperator, LogicalPlan> aliases; + Map<OperatorKey, LogicalOperator> logicalOpTable; + Map<String, LogicalOperator> aliasOp; + Map<String, String> fileNameMap; }