Author: olga
Date: Tue Jun 16 20:27:38 2009
New Revision: 785378
URL: http://svn.apache.org/viewvc?rev=785378&view=rev
Log:
PIG-849: Local engine loses records in splits (hagleitn via olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=785378&r1=785377&r2=785378&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jun 16 20:27:38 2009
@@ -28,6 +28,11 @@
BUG FIXES
+ PIG-852: pig -version or pig -help returns exit code of 1 (milindb via
+ olgan)
+
+ PIG-849: Local engine loses records in splits (hagleitn via olgan)
+
Release 0.3.0 - Unreleased
INCOMPATIBLE CHANGES
@@ -692,5 +697,3 @@
PIG-284: target for building source jar (oae via olgan)
- PIG-852: pig -version or pig -help returns exit code of 1 (milindb via
- olgan)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=785378&r1=785377&r2=785378&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
Tue Jun 16 20:27:38 2009
@@ -68,7 +68,10 @@
if(input.returnStatus == POStatus.STATUS_ERR) {
throw new ExecException("Error accumulating output at local
Split operator");
}
- data.add((Tuple) input.result);
+
+ if (input.returnStatus != POStatus.STATUS_NULL) {
+ data.add((Tuple) input.result);
+ }
}
processingDone = true;
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java?rev=785378&r1=785377&r2=785378&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java Tue Jun 16
20:27:38 2009
@@ -67,6 +67,8 @@
@Test
public void testSplit() throws IOException, VisitorException,
ExecException {
+ init();
+
pigContext.connect();
File datFile = File.createTempFile("tempA", ".dat");
@@ -134,6 +136,76 @@
}
}
+ @Test
+ public void testSplitNulls() throws IOException, VisitorException,
ExecException {
+ init();
+
+ pigContext.connect();
+ File datFile = File.createTempFile("tempN1", ".dat");
+ String path1 = Util.encodeEscape(datFile.getAbsolutePath());
+
+ FileOutputStream dat = new FileOutputStream(datFile);
+ String s1 = "1\n2\n3\n42\n4\n5\n";
+ dat.write(s1.getBytes());
+ dat.close();
+
+ String s2 = "1\n2\n43\n3\n4\n5\n";
+
+ datFile = File.createTempFile("tempN2", ".dat");
+ String path2 = Util.encodeEscape(datFile.getAbsolutePath());
+
+ dat = new FileOutputStream(datFile);
+ dat.write(s2.getBytes());
+ dat.close();
+
+ String query = "a = load '"+path1+"'; b = load '"+path2+"'; "+
+ "c = cogroup a by $0, b by $0; d = foreach c generate $0,
flatten($1), flatten($2); "+
+ "split d into e if 1==1, f if 1==1;";
+
+ LogicalPlan plan = buildPlan(query);
+ PhysicalPlan pp = buildPhysicalPlan(plan);
+
+ DataBag[] bag = new DataBag[pp.getLeaves().size()];
+
+ for (int i = 0; i < bag.length; i++) {
+ bag[i] = BagFactory.getInstance().newDefaultBag();
+ }
+
+ for (int i = 0; i < pp.getLeaves().size(); i++) {
+ System.out.println("Leaves: "+i);
+ Tuple t = null;
+ for (Result res = pp.getLeaves().get(i).getNext(t);
res.returnStatus != POStatus.STATUS_EOP; res = pp
+ .getLeaves().get(i).getNext(t)) {
+ if (res.returnStatus == POStatus.STATUS_OK)
+ bag[i].add((Tuple) res.result);
+ System.out.println("Split: "+res.result);
+ }
+ }
+
+ Map<DataByteArray, Integer> seen = new HashMap<DataByteArray,
Integer>();
+ seen.put(new DataByteArray("1".getBytes()), new Integer(0));
+ seen.put(new DataByteArray("2".getBytes()), new Integer(0));
+ seen.put(new DataByteArray("3".getBytes()), new Integer(0));
+ seen.put(new DataByteArray("4".getBytes()), new Integer(0));
+ seen.put(new DataByteArray("5".getBytes()), new Integer(0));
+
+ for (int i = 0; i < bag.length; i++) {
+ DataByteArray value = null;
+ Iterator<Tuple> it = bag[i].iterator();
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ System.out.println("Value: "+t);
+ value = (DataByteArray) t.get(0);
+ Integer count = seen.get(value);
+ seen.put(value, ++count);
+ }
+ }
+
+ for (Integer j: seen.values()) {
+ assertEquals(j, new Integer(2));
+ }
+ }
+
public PhysicalPlan buildPhysicalPlan(LogicalPlan lp)
throws VisitorException {
LocalLogToPhyTranslationVisitor visitor = new
LocalLogToPhyTranslationVisitor(
@@ -153,8 +225,18 @@
LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext); //
try {
- LogicalPlan lp = builder.parse("Test-Plan-Builder", query, aliases,
- logicalOpTable, aliasOp,
fileNameMap);
+ String[] qs = query.split(";");
+ LogicalPlan lp = null;
+ for (String q: qs) {
+ q = q.trim();
+ if (q.equals(""))
+ continue;
+ q += ";";
+ System.out.println(q);
+ lp = builder.parse("Test-Plan-Builder", q, aliases,
+ logicalOpTable, aliasOp, fileNameMap);
+ }
+
List<LogicalOperator> roots = lp.getRoots();
if (roots.size() > 0) {
@@ -238,8 +320,15 @@
}
- Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator,
LogicalPlan>();
- Map<OperatorKey, LogicalOperator> logicalOpTable = new
HashMap<OperatorKey, LogicalOperator>();
- Map<String, LogicalOperator> aliasOp = new HashMap<String,
LogicalOperator>();
- Map<String, String> fileNameMap = new HashMap<String, String>();
+ private void init() {
+ aliases = new HashMap<LogicalOperator, LogicalPlan>();
+ logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
+ aliasOp = new HashMap<String, LogicalOperator>();
+ fileNameMap = new HashMap<String, String>();
+ }
+
+ Map<LogicalOperator, LogicalPlan> aliases;
+ Map<OperatorKey, LogicalOperator> logicalOpTable;
+ Map<String, LogicalOperator> aliasOp;
+ Map<String, String> fileNameMap;
}