Author: pradeepkth
Date: Thu Apr 8 22:06:22 2010
New Revision: 932144
URL: http://svn.apache.org/viewvc?rev=932144&view=rev
Log:
PIG-1366: PigStorage's pushProjection implementation results in NPE under
certain data conditions (pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=932144&r1=932143&r2=932144&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Apr 8 22:06:22 2010
@@ -39,6 +39,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1366: PigStorage's pushProjection implementation results in NPE under
+certain data conditions (pradeepkth)
+
PIG-1365: WrappedIOException is missing from Pig.jar (pradeepkth)
PIG-1313: PigServer leaks memory over time (billgraham via daijy)
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=932144&r1=932143&r2=932144&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Apr 8
22:06:22 2010
@@ -97,6 +97,7 @@ LoadPushDown {
@Override
public Tuple getNext() throws IOException {
+ mProtoTuple = new ArrayList<Object>();
if (!mRequiredColumnsInitialized) {
if (signature!=null) {
Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass());
@@ -127,7 +128,6 @@ LoadPushDown {
readField(buf, start, len);
}
Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple);
- mProtoTuple = null;
return t;
} catch (InterruptedException e) {
int errCode = 6018;
@@ -171,10 +171,6 @@ LoadPushDown {
}
private void readField(byte[] buf, int start, int end) {
- if (mProtoTuple == null) {
- mProtoTuple = new ArrayList<Object>();
- }
-
if (start == end) {
// NULL value
mProtoTuple.add(null);
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=932144&r1=932143&r2=932144&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java Thu Apr 8
22:06:22 2010
@@ -26,46 +26,47 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
+import java.util.Properties;
+import java.util.Map.Entry;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.data.Tuple;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.Before;
import org.junit.Test;
-public class TestPigStorage {
+public class TestPigStorage {
protected final Log log = LogFactory.getLog(getClass());
private static MiniCluster cluster = MiniCluster.buildCluster();
- private static PigServer pigServer = null;
-
- @BeforeClass
- public static void setup() {
- try {
- pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
- } catch (ExecException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- @AfterClass
- public static void shutdown() {
- pigServer.shutdown();
+ @Before
+ public void setup() {
+ // some tests are in map-reduce mode and some in local - so before
+ // each test, we will de-initialize FileLocalizer so that temp files
+ // are created correctly depending on the ExecType in the test.
+ FileLocalizer.setInitialized(false);
}
@Test
- public void testBlockBoundary() {
+ public void testBlockBoundary() throws ExecException {
// This tests PigStorage loader with records exectly
// on the boundary of the file blocks.
+ Properties props = new Properties();
+ for (Entry<Object, Object> entry : cluster.getProperties().entrySet())
{
+ props.put(entry.getKey(), entry.getValue());
+ }
+ props.setProperty("mapred.max.split.size", "20");
+ PigServer pigServer = new PigServer(MAPREDUCE, props);
String[] inputs = {
"abcdefgh1", "abcdefgh2", "abcdefgh3",
"abcdefgh4", "abcdefgh5", "abcdefgh6",
@@ -115,5 +116,32 @@ public class TestPigStorage {
}
}
}
+
+ /**
+ * Test to verify that PigStorage works fine in the following scenario:
+ * The column prune optimization determines only columns 2 and 3 are needed
+ * and there are records in the data which have only 1 column (malformed
data).
+ * In this case, PigStorage should return an empty tuple to represent
columns
+ * 2 and 3 and {...@link POProject} would handle catching any
+ * {...@link IndexOutOfBoundsException} resulting from accessing a field
in the
+ * tuple and substitute a null.
+ */
+ @Test
+ public void testPruneColumnsWithMissingFields() throws IOException {
+ String inputFileName =
"TestPigStorage-testPruneColumnsWithMissingFields-input.txt";
+ Util.createLocalInputFile(
+ inputFileName,
+ new String[] {"1\t2\t3", "4", "5\t6\t7"});
+ PigServer ps = new PigServer(ExecType.LOCAL);
+ String script = "a = load '" + inputFileName + "' as (i:int, j:int,
k:int);" +
+ "b = foreach a generate j, k;";
+ Util.registerMultiLineQuery(ps, script);
+ Iterator<Tuple> it = ps.openIterator("b");
+ assertEquals(Util.createTuple(new Integer[] { 2, 3}), it.next());
+ assertEquals(Util.createTuple(new Integer[] { null, null}), it.next());
+ assertEquals(Util.createTuple(new Integer[] { 6, 7}), it.next());
+ assertFalse(it.hasNext());
+
+ }
}