Author: szita Date: Thu Jan 3 15:54:11 2019 New Revision: 1850245 URL: http://svn.apache.org/viewvc?rev=1850245&view=rev Log: PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1850245&r1=1850244&r2=1850245&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Thu Jan 3 15:54:11 2019 @@ -88,6 +88,8 @@ OPTIMIZATIONS BUG FIXES +PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita) + PIG-5370: Union onschema + columnprune dropping used fields (knoguchi) PIG-5362: Parameter substitution of shell cmd results doesn't handle backslash (wlauer via rohini) Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1850245&r1=1850244&r2=1850245&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original) +++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Thu Jan 3 15:54:11 2019 @@ -20,6 +20,7 @@ package org.apache.pig.impl.io; import java.io.DataInputStream; import java.io.IOException; +import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -89,35 +90,34 @@ public class InterRecordReader extends R * @return true if marker was observed, false if EOF or EndOfSplit was reached * @throws IOException */ - private boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException { + public boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException { int b = Integer.MIN_VALUE; -outer:while (b != -1) { - if (b != syncMarker[0]) { + CircularFifoQueue<Integer> queue = new CircularFifoQueue(syncMarker.length); + outer:while (b != -1) { + //There may be a case where we read through a whole split without a marker, then we shouldn't proceed + // because the records are from the next split which another reader would pick up too + //One exception of reading past split end is if at least the first byte of the marker was seen before split + // end. + if (in.getPosition() >= (end+syncMarker.length-1)) { + return false; + } + b = in.read(); - //There may be a case where we read through a whole split without a marker, then we shouldn't proceed - // because the records are from the next split which another reader would pick up too - if (in.getPosition() >= end) { - return false; - } - b = in.read(); - if ((byte) b != syncMarker[0] && b != -1) { - continue; - } - if (b == -1) return false; + //EOF reached + if (b == -1) return false; + + queue.add(b); + if (queue.size() != queue.maxSize()) { + //Not enough bytes read yet + continue outer; } - int i = 1; - while (i < syncMarker.length) { - b = in.read(); - if (b == -1) return false; - if ((byte) b != syncMarker[i]) { - if (in.getPosition() > end) { - //Again we should not read past the split end, only if at least the first byte of marker was seen before it - return false; - } + int i = 0; + for (Integer seenByte : queue){ + if (syncMarker[i++] != seenByte.byteValue()) { continue outer; } - ++i; } + //Found marker: queue content equals sync marker lastSyncPos = in.getPosition(); return true; } Modified: pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java?rev=1850245&r1=1850244&r2=1850245&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java (original) +++ pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java Thu Jan 3 15:54:11 2019 @@ -18,6 +18,7 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -26,6 +27,7 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -42,8 +44,11 @@ import org.apache.pig.data.InterSedes; import org.apache.pig.data.InterSedesFactory; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.io.InterRecordReader; import org.apache.pig.impl.util.TupleFormat; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; public class TestBinInterSedes { private static final TupleFactory mTupleFactory = TupleFactory.getInstance(); @@ -437,6 +442,45 @@ public class TestBinInterSedes { } + /** + * Tests all combination where: + * sync marker is {x, y, 4} + * data is {127, -2, 2, z, x, y, 4, 1, 2, 3} + * x,y,z in [-128,127] + * This means that a sync marker has to be found in all iterations (total=16,777,216) + * @throws Exception + */ + @Test + public void testPrefixSyncMarkers() throws Exception { + long defaultInterval = PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT; + + for (int b0 = -128; b0 <= 127; b0++) { + for (int b1 = -128; b1 <= 127; b1++) { + for (int b2 = -128; b2 <= 127; b2++) { + byte[] syncMarker = new byte[]{(byte) b0, (byte) b1, (byte)4}; + byte[] data = new byte[]{127, -1, 2, (byte) b2, (byte) b0, (byte) b1, 4, 1, 2, 3}; + + ByteArrayInputStream bi = new ByteArrayInputStream(data); + BufferedPositionedInputStream bpi = new BufferedPositionedInputStream(bi); + + InterRecordReader reader = new InterRecordReader(syncMarker.length, defaultInterval); + Whitebox.setInternalState(reader, "syncMarker", syncMarker); + Whitebox.setInternalState(reader, "end", data.length); + Whitebox.setInternalState(reader, "in", bpi); + + try { + boolean ret = reader.skipUntilMarkerOrSplitEndOrEOF(); + assertTrue("Marker should have been found: " + "marker: " + + Arrays.toString(syncMarker) + " , data: " + Arrays.toString(data),ret); + } finally { + bpi.close(); + } + + } + } + } + } + private void testSerTuple(Tuple t, byte[] expected) throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutput out = new DataOutputStream(baos);