Author: szita Date: Tue Jan 8 10:37:57 2019 New Revision: 1850723 URL: http://svn.apache.org/viewvc?rev=1850723&view=rev Log: PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1850723&r1=1850722&r2=1850723&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Tue Jan 8 10:37:57 2019 @@ -88,6 +88,8 @@ OPTIMIZATIONS BUG FIXES +PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita) + PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita) PIG-5370: Union onschema + columnprune dropping used fields (knoguchi) Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1850723&r1=1850722&r2=1850723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original) +++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Tue Jan 8 10:37:57 2019 @@ -20,7 +20,7 @@ package org.apache.pig.impl.io; import java.io.DataInputStream; import java.io.IOException; -import org.apache.commons.collections4.queue.CircularFifoQueue; +import org.apache.commons.collections.buffer.CircularFifoBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -92,7 +92,7 @@ public class InterRecordReader extends R */ public boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException { int b = Integer.MIN_VALUE; - CircularFifoQueue<Integer> queue = new CircularFifoQueue(syncMarker.length); + CircularFifoBuffer queue = new CircularFifoBuffer(syncMarker.length); outer:while (b != -1) { //There may be a case where we read through a whole split without a marker, then we shouldn't proceed // because the records are from the next split which another reader would pick up too @@ -107,13 +107,13 @@ public class InterRecordReader extends R if (b == -1) return false; queue.add(b); - if (queue.size() != queue.maxSize()) { + if (!queue.isFull()) { //Not enough bytes read yet continue outer; } int i = 0; - for (Integer seenByte : queue){ - if (syncMarker[i++] != seenByte.byteValue()) { + for (Object seenByte : queue){ + if (syncMarker[i++] != ((Integer)seenByte).byteValue()) { continue outer; } }