Author: szita
Date: Tue Jan  8 10:37:57 2019
New Revision: 1850723

URL: http://svn.apache.org/viewvc?rev=1850723&view=rev
Log:
PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1850723&r1=1850722&r2=1850723&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan  8 10:37:57 2019
@@ -88,6 +88,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita)
+
 PIG-5373: InterRecordReader might skip records if certain sync markers are 
used (szita)
 
 PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)

Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1850723&r1=1850722&r2=1850723&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Tue Jan  8 
10:37:57 2019
@@ -20,7 +20,7 @@ package org.apache.pig.impl.io;
 import java.io.DataInputStream;
 import java.io.IOException;
 
-import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -92,7 +92,7 @@ public class InterRecordReader extends R
      */
   public boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
       int b = Integer.MIN_VALUE;
-      CircularFifoQueue<Integer> queue = new 
CircularFifoQueue(syncMarker.length);
+      CircularFifoBuffer queue = new CircularFifoBuffer(syncMarker.length);
       outer:while (b != -1) {
           //There may be a case where we read through a whole split without a 
marker, then we shouldn't proceed
           // because the records are from the next split which another reader 
would pick up too
@@ -107,13 +107,13 @@ public class InterRecordReader extends R
           if (b == -1) return false;
 
           queue.add(b);
-          if (queue.size() != queue.maxSize()) {
+          if (!queue.isFull()) {
               //Not enough bytes read yet
               continue outer;
           }
           int i = 0;
-          for (Integer seenByte : queue){
-              if (syncMarker[i++] != seenByte.byteValue()) {
+          for (Object seenByte : queue){
+              if (syncMarker[i++] != ((Integer)seenByte).byteValue()) {
                   continue outer;
               }
           }


Reply via email to