Author: pradeepkth Date: Thu Apr 8 22:06:22 2010 New Revision: 932144 URL: http://svn.apache.org/viewvc?rev=932144&view=rev Log: PIG-1366: PigStorage's pushProjection implementation results in NPE under certain data conditions (pradeepkth)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=932144&r1=932143&r2=932144&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Apr 8 22:06:22 2010 @@ -39,6 +39,9 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1366: PigStorage's pushProjection implementation results in NPE under +certain data conditions (pradeepkth) + PIG-1365: WrappedIOException is missing from Pig.jar (pradeepkth) PIG-1313: PigServer leaks memory over time (billgraham via daijy) Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=932144&r1=932143&r2=932144&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Apr 8 22:06:22 2010 @@ -97,6 +97,7 @@ LoadPushDown { @Override public Tuple getNext() throws IOException { + mProtoTuple = new ArrayList<Object>(); if (!mRequiredColumnsInitialized) { if (signature!=null) { Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass()); @@ -127,7 +128,6 @@ LoadPushDown { readField(buf, start, len); } Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple); - mProtoTuple = null; return t; } catch (InterruptedException e) { int errCode = 6018; @@ -171,10 +171,6 @@ LoadPushDown { } private void readField(byte[] buf, int start, int end) { - if (mProtoTuple == null) { - mProtoTuple = new ArrayList<Object>(); - } - if (start == end) { // NULL value mProtoTuple.add(null); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=932144&r1=932143&r2=932144&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java Thu Apr 8 22:06:22 2010 @@ -26,46 +26,47 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.util.Iterator; +import java.util.Properties; +import java.util.Map.Entry; import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.data.Tuple; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.apache.pig.impl.io.FileLocalizer; +import org.junit.Before; import org.junit.Test; -public class TestPigStorage { +public class TestPigStorage { protected final Log log = LogFactory.getLog(getClass()); private static MiniCluster cluster = MiniCluster.buildCluster(); - private static PigServer pigServer = null; - - @BeforeClass - public static void setup() { - try { - pigServer = new PigServer(MAPREDUCE, cluster.getProperties()); - } catch (ExecException e) { - e.printStackTrace(); - Assert.fail(); - } - } - - @AfterClass - public static void shutdown() { - pigServer.shutdown(); + @Before + public void setup() { + // some tests are in map-reduce mode and some in local - so before + // each test, we will de-initialize FileLocalizer so that temp files + // are created correctly depending on the ExecType in the test. + FileLocalizer.setInitialized(false); } @Test - public void testBlockBoundary() { + public void testBlockBoundary() throws ExecException { // This tests PigStorage loader with records exectly // on the boundary of the file blocks. + Properties props = new Properties(); + for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) { + props.put(entry.getKey(), entry.getValue()); + } + props.setProperty("mapred.max.split.size", "20"); + PigServer pigServer = new PigServer(MAPREDUCE, props); String[] inputs = { "abcdefgh1", "abcdefgh2", "abcdefgh3", "abcdefgh4", "abcdefgh5", "abcdefgh6", @@ -115,5 +116,32 @@ public class TestPigStorage { } } } + + /** + * Test to verify that PigStorage works fine in the following scenario: + * The column prune optimization determines only columns 2 and 3 are needed + * and there are records in the data which have only 1 column (malformed data). + * In this case, PigStorage should return an empty tuple to represent columns + * 2 and 3 and {...@link POProject} would handle catching any + * {...@link IndexOutOfBoundsException} resulting from accessing a field in the + * tuple and substitute a null. + */ + @Test + public void testPruneColumnsWithMissingFields() throws IOException { + String inputFileName = "TestPigStorage-testPruneColumnsWithMissingFields-input.txt"; + Util.createLocalInputFile( + inputFileName, + new String[] {"1\t2\t3", "4", "5\t6\t7"}); + PigServer ps = new PigServer(ExecType.LOCAL); + String script = "a = load '" + inputFileName + "' as (i:int, j:int, k:int);" + + "b = foreach a generate j, k;"; + Util.registerMultiLineQuery(ps, script); + Iterator<Tuple> it = ps.openIterator("b"); + assertEquals(Util.createTuple(new Integer[] { 2, 3}), it.next()); + assertEquals(Util.createTuple(new Integer[] { null, null}), it.next()); + assertEquals(Util.createTuple(new Integer[] { 6, 7}), it.next()); + assertFalse(it.hasNext()); + + } }