Author: szita Date: Sat Jul 22 11:38:47 2017 New Revision: 1802676 URL: http://svn.apache.org/viewvc?rev=1802676&view=rev Log: PIG-3655: BinStorage and InterStorage approach to record markers is broken (szita)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/PigConfiguration.java pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java pig/trunk/src/org/apache/pig/impl/io/InterStorage.java pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1802676&r1=1802675&r2=1802676&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Sat Jul 22 11:38:47 2017 @@ -38,6 +38,8 @@ OPTIMIZATIONS BUG FIXES +PIG-3655: BinStorage and InterStorage approach to record markers is broken (szita) + PIG-5274: TestEvalPipelineLocal#testSetLocationCalledInFE is failing in spark mode after PIG-5157 (nkollar via szita) PIG-4767: Partition filter not pushed down when filter clause references variable from another load path (knoguchi) Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1802676&r1=1802675&r2=1802676&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/PigConfiguration.java (original) +++ pig/trunk/src/org/apache/pig/PigConfiguration.java Sat Jul 22 11:38:47 2017 @@ -40,6 +40,24 @@ public class PigConfiguration { */ public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes"; + + /** + * Sets the length of record markers in binary files produces by Pig between jobs + * The longer the byte sequence means less chance of collision with actual data, + * shorter sequence means less overhead + */ + public static final String PIG_INTERSTORAGE_SYNCMARKER_SIZE = "pig.interstorage.syncmarker.size"; + public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX = 16; + public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT = 10; + public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN = 2; + + /** + * Defines the interval (in bytes) when a sync marker should be written into the binary file + */ + public static final String PIG_INTERSTORAGE_SYNCMARKER_INTERVAL = "pig.interstorage.syncmarker.interval"; + public static final long PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT = 2000; + + /** * Boolean value used to enable or disable fetching without a mapreduce job for DUMP. True by default */ Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1802676&r1=1802675&r2=1802676&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original) +++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Sat Jul 22 11:38:47 2017 @@ -42,16 +42,23 @@ import org.apache.pig.data.Tuple; public class InterRecordReader extends RecordReader<Text, Tuple> { private long start; - private long pos; + private long lastDataPos; private long end; private BufferedPositionedInputStream in; private Tuple value = null; - public static final int RECORD_1 = 0x01; - public static final int RECORD_2 = 0x02; - public static final int RECORD_3 = 0x03; private DataInputStream inData = null; private static InterSedes sedes = InterSedesFactory.getInterSedesInstance(); + private byte[] syncMarker; + private long lastSyncPos = -1; + private long syncMarkerInterval; + private long dataBytesSeen = 0; + + public InterRecordReader(int syncMarkerLength, long syncMarkerInterval) { + this.syncMarker = new byte[syncMarkerLength]; + this.syncMarkerInterval = syncMarkerInterval; + } + public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; @@ -60,63 +67,131 @@ public class InterRecordReader extends R end = start + split.getLength(); final Path file = split.getPath(); - // open the file and seek to the start of the split + // open the file FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); - if (start != 0) { - fileIn.seek(start); + + // read the magic byte sequence serving as record marker but only if the file is not empty + if (!(start == 0 && end == 0)) { + fileIn.readFully(0, syncMarker, 0, syncMarker.length); } + + //seek to the start of the split + fileIn.seek(start); + in = new BufferedPositionedInputStream(fileIn, start); inData = new DataInputStream(in); } - - public boolean nextKeyValue() throws IOException { + + + /** + * Skips to next sync marker + * @return true if marker was observed, false if EOF or EndOfSplit was reached + * @throws IOException + */ + private boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException { int b = 0; - // skip to next record - while (true) { - if (in == null || in.getPosition() >=end) { - return false; - } - // check if we saw RECORD_1 in our last attempt - // this can happen if we have the following - // sequence RECORD_1-RECORD_1-RECORD_2-RECORD_3 - // After reading the second RECORD_1 in the above - // sequence, we should not look for RECORD_1 again - if(b != RECORD_1) { +outer:while (b != -1) { + if (b != syncMarker[0]) { + + //There may be a case where we read through a whole split without a marker, then we shouldn't proceed + // because the records are from the next split which another reader would pick up too + if (in.getPosition() >= end) { + return false; + } b = in.read(); - if(b != RECORD_1 && b != -1) { + if ((byte) b != syncMarker[0] && b != -1) { continue; } - if(b == -1) return false; + if (b == -1) return false; } - b = in.read(); - if(b != RECORD_2 && b != -1) { - continue; + int i = 1; + while (i < syncMarker.length) { + b = in.read(); + if (b == -1) return false; + if ((byte) b != syncMarker[i]) { + continue outer; + } + ++i; } - if(b == -1) return false; + lastSyncPos = in.getPosition(); + return true; + } + return false; + } + + /** + * Reads a sync marker + * @return true if sync marker was read, false if EOF reached + * @throws IOException thrown if neither EOF nor proper sync was found + */ + private boolean readSyncFullyOrEOF() throws IOException { + int b = in.read(); + if (b == -1) { + //EOF reached + return false; + } + if ((byte) b != syncMarker[0]) { + throw new IOException("Corrupt data file, expected sync marker at position " + in.getPosition()); + } + int i = 1; + while (i < syncMarker.length) { b = in.read(); - if(b != RECORD_3 && b != -1) { - continue; + if ((byte) b != syncMarker[i]) { + throw new IOException("Corrupt data file, expected sync marker at position " + in.getPosition()); } - if(b == -1) return false; - b = in.read(); - if(!BinInterSedes.isTupleByte((byte) b) && - b != -1) { - continue; + ++i; + } + lastSyncPos = in.getPosition(); + return true; + + } + + private boolean readDataOrEOF() throws IOException { + long preDataPos = in.getPosition(); + int b = in.read(); + if(!BinInterSedes.isTupleByte((byte) b) ) { + if (b == -1) { + //EOF reached + return false; + } else { + throw new IOException("Corrupt data file, expected tuple type byte, but seen " + b); } - if(b == -1) return false; - break; } try { - // if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER - // sequence - lets now read the contents of the tuple value = (Tuple)sedes.readDatum(inData, (byte)b); - pos=in.getPosition(); + lastDataPos = in.getPosition(); + dataBytesSeen += (lastDataPos-preDataPos); return true; } catch (ExecException ee) { throw ee; } + } + + public boolean nextKeyValue() throws IOException { + + //No marker has been seen, look for next marker + if (lastSyncPos == -1) { + if (!skipUntilMarkerOrSplitEndOrEOF()) { + return false; + } + } + + //If we've read more or equal amount of data than the sync interval, we expect a sync marker or EOF + if (dataBytesSeen >= syncMarkerInterval) { + boolean isEOF = !readSyncFullyOrEOF(); + if (isEOF) { + return false; + } + dataBytesSeen = 0; + //If we've just seen a (non-first) sync marker which was completely in the next split then we need to stop + if (in.getPosition()-syncMarker.length >= end) { + return false; + } + } + //Sync marker has been seen, expect data + return readDataOrEOF(); } @Override @@ -138,7 +213,7 @@ public class InterRecordReader extends R if (start == end) { return 0.0f; } else { - return Math.min(1.0f, (pos - start) / (float)(end - start)); + return Math.min(1.0f, (lastDataPos - start) / (float)(end - start)); } } Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java?rev=1802676&r1=1802675&r2=1802676&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java (original) +++ pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java Sat Jul 22 11:38:47 2017 @@ -17,12 +17,16 @@ */ package org.apache.pig.impl.io; -import java.io.DataOutputStream; import java.io.IOException; +import java.rmi.server.UID; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Time; import org.apache.pig.data.InterSedes; import org.apache.pig.data.InterSedesFactory; import org.apache.pig.data.Tuple; @@ -35,20 +39,34 @@ import org.apache.pig.data.Tuple; public class InterRecordWriter extends RecordWriter<org.apache.hadoop.io.WritableComparable, Tuple> { - public static final int RECORD_1 = 0x01; - public static final int RECORD_2 = 0x02; - public static final int RECORD_3 = 0x03; private static InterSedes sedes = InterSedesFactory.getInterSedesInstance(); + + private byte[] syncMarker; + private long lastSyncPos = -1; + private long syncMarkerInterval; /** * the outputstream to write out on */ - private DataOutputStream out; + private FSDataOutputStream out; /** * */ - public InterRecordWriter(DataOutputStream out) { + public InterRecordWriter(FSDataOutputStream out, int syncMarkerLength, long syncMarkerInterval) { this.out = out; + this.syncMarkerInterval = syncMarkerInterval; + syncMarker = new byte[syncMarkerLength]; + + try { + MessageDigest digester = MessageDigest.getInstance("MD5"); + long time = Time.now(); + digester.update((new UID()+"@"+time).getBytes()); + byte[] generatedMarker = digester.digest(); + System.arraycopy(generatedMarker, 0, syncMarker, 0, syncMarkerLength); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } /* (non-Javadoc) @@ -66,10 +84,11 @@ public class InterRecordWriter extends @Override public void write(WritableComparable wc, Tuple t) throws IOException, InterruptedException { - // we really only want to write the tuple (value) out here - out.write(RECORD_1); - out.write(RECORD_2); - out.write(RECORD_3); + // we really only want to write the tuple (value) out here (and a sync syncMarker before that if necessary) + if (lastSyncPos == -1 || out.getPos() >= (lastSyncPos + syncMarkerInterval)) { + out.write(syncMarker); + lastSyncPos = out.getPos(); + } sedes.writeDatum(out, t); } Modified: pig/trunk/src/org/apache/pig/impl/io/InterStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java?rev=1802676&r1=1802675&r2=1802676&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/io/InterStorage.java (original) +++ pig/trunk/src/org/apache/pig/impl/io/InterStorage.java Sat Jul 22 11:38:47 2017 @@ -39,6 +39,7 @@ import org.apache.pig.Expression; import org.apache.pig.FileInputLoadFunc; import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; +import org.apache.pig.PigConfiguration; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceStatistics; import org.apache.pig.StoreFunc; @@ -62,10 +63,10 @@ implements StoreFuncInterface, LoadMetad private static final Log mLog = LogFactory.getLog(InterStorage.class); public static final String useLog = "Pig Internal storage in use"; - + private InterRecordReader recReader = null; private InterRecordWriter recWriter = null; - + /** * Simple binary nested reader format */ @@ -102,7 +103,9 @@ implements StoreFuncInterface, LoadMetad public RecordReader<Text, Tuple> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - return new InterRecordReader(); + return new InterRecordReader(retrieveMarkerLengthFromConf(context.getConfiguration()), + retrieveMarkerIntervalFromConf(context.getConfiguration()) + ); } } @@ -141,7 +144,10 @@ implements StoreFuncInterface, LoadMetad Path file = getDefaultWorkFile(job, ""); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); - return new InterRecordWriter(fileOut); + return new InterRecordWriter(fileOut, + retrieveMarkerLengthFromConf(job.getConfiguration()), + retrieveMarkerIntervalFromConf(job.getConfiguration()) + ); } } @@ -208,4 +214,22 @@ implements StoreFuncInterface, LoadMetad public void cleanupOnSuccess(String location, Job job) throws IOException { // DEFAULT: do nothing } + + private static int retrieveMarkerLengthFromConf(Configuration conf) { + int requestedLength = conf.getInt(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE, + PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT); + + if (requestedLength > PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX) { + requestedLength = PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX; + } else if (requestedLength < PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN) { + requestedLength = PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN; + } + + return requestedLength; + } + + private static long retrieveMarkerIntervalFromConf(Configuration conf) { + return conf.getLong(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL, + PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT); + } } Modified: pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java?rev=1802676&r1=1802675&r2=1802676&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java (original) +++ pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java Sat Jul 22 11:38:47 2017 @@ -44,6 +44,7 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileSplit; @@ -506,30 +507,20 @@ public class TestSchemaTuple { File temp = File.createTempFile("tmp", "tmp"); temp.deleteOnExit(); FileOutputStream fos = new FileOutputStream(temp); - DataOutputStream dos = new DataOutputStream(fos); + FSDataOutputStream dos = new FSDataOutputStream(fos, null); - InterRecordWriter writer = new InterRecordWriter(dos); + InterRecordWriter writer = new InterRecordWriter(dos, + PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT, + PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT); - // We add these lines because a part of the InterStorage logic - // is the ability to seek to the next Tuple based on a magic set - // of bytes. This emulates the random byes that will be present - // at the beginning of a split. - dos.writeByte(r.nextInt()); - dos.writeByte(r.nextInt()); - dos.writeByte(r.nextInt()); - dos.writeByte(r.nextInt()); - dos.writeByte(r.nextInt()); - dos.writeByte(r.nextInt()); + // This test does not cover the case of overlapping record bytes that may be present at the + // beginning of a split, for that see org.apache.pig.test.TestBinInterSedes#testInterStorageSyncMarker() for (int i = 0; i < sz; i++) { SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple(); fillWithData(st); writer.write(null, st); written.add(st); - - dos.writeByte(r.nextInt()); - dos.writeByte(r.nextInt()); - dos.writeByte(r.nextInt()); } writer.close(null); @@ -541,7 +532,8 @@ public class TestSchemaTuple { InputSplit is = new FileSplit(new Path(temp.getAbsolutePath()), 0, temp.length(), null); - InterRecordReader reader = new InterRecordReader(); + InterRecordReader reader = new InterRecordReader(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT, + PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT); reader.initialize(is, HadoopShims.createTaskAttemptContext(conf, taskId)); for (int i = 0; i < sz; i++) { Modified: pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java?rev=1802676&r1=1802675&r2=1802676&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java (original) +++ pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java Sat Jul 22 11:38:47 2017 @@ -24,11 +24,16 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.Properties; import java.util.Random; +import org.apache.pig.PigConfiguration; +import org.apache.pig.PigServer; import org.apache.pig.data.BagFactory; import org.apache.pig.data.BinInterSedes; import org.apache.pig.data.DataBag; @@ -296,6 +301,142 @@ public class TestBinInterSedes { } } + + /* + The following tests are intended to verify the reading and writing of intermediate files of Pig (of InterStorage) + The test records are 11,14,22,14 bytes long. + Below I illustrate the splits in rows, records as [] with size and sync markers with [M] + */ + + /** + * One sync marker only and three splits where the records overlap the splitends. + * (Reader of 1st split should read every record, readers of 2nd and 3rd splits should read no records.) + * [M(10)] [11] [11- + * -3] [ 22 ] [7- + * -7] + * + * @throws Exception + */ + @Test + public void testSyncMarkerOneMarkerAtBeginningOnly() throws Exception { + testInterStorageSyncMarker(32, 10, 2000L); + } + + /** + * Some sync markers are positioned so that they begin at a split's end and they end in the next split's beginning. + * (Reader of a split has to read until the next sync marker that has all its bytes in a following split.) + * @throws Exception + */ + @Test + public void testSyncMarkerOverlappingMarker() throws Exception { + /* + * [M(16)] [11] [M(16)] [5- + * -9] [M(16)] [ 22 ] [M(1- + * -15)] [14] + */ + testInterStorageSyncMarker(48, 16, 10L); + /* + * [M(4)] [ 4- + * -7] [1- + * - 8 - + * -5] [M(3- + * -1)] [ 7- + * - 8 - + * -7] [M(1- + * -3)] [ 5- + * - 8 - + * -1] + */ + testInterStorageSyncMarker(8, 4, 20L); + } + + /** + * No illustration for this one to save characters .. Sync size is over 3 times the size of split size, this is an + * extremely unlikely scenario. Markers here span over 4 splits. + * @throws Exception + */ + @Test + public void testSyncMarkerLongerMarkerThanSplit() throws Exception { + testInterStorageSyncMarker(5, 16, 20L); + } + + /** + * A sync marker is positioned at exactly the end of the first split without overlapping into the next one. + * (Reader of the 1st split should read past it and into the 2nd split until next marker.) + * + * [M(2)] [11] [14] [M(2)] + * [ 22 ] [M(2)] [ 5- + * -9] + * @throws Exception + */ + @Test + public void testSyncMarkerMarkerOnSplitEnd() throws Exception { + testInterStorageSyncMarker(29, 2, 20L); + } + + /** + * A sync marker is positioned at exactly the beginning of the 3rd split. + * (Reader of the 1st split should read 1st and 2nd splits fully, reader of 2nd split should read no records.) + * + * [M(3)] [11] + * [ 14 ] + * [M(3) [11- + * -11 ] [3- + * -11 ] + * @throws Exception + */ + @Test + public void testSyncMarkerMarkerOnSplitBeginning() throws Exception { + testInterStorageSyncMarker(14, 3, 25L); + } + + private void testInterStorageSyncMarker(int maxSplitSize, int syncSize, long syncInterval) throws Exception { + PigServer pigServer = new PigServer(Util.getLocalTestMode(), new Properties()); + + Properties pigProperties = pigServer.getPigContext().getProperties(); + pigProperties.setProperty("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(maxSplitSize)); + pigProperties.setProperty(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE, String.valueOf(syncSize)); + pigProperties.setProperty(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL, String.valueOf(syncInterval)); + + //Without proper random record markers 0x01020327 would be identified as a marker and 0x50 as an unknown datatype + //ByteBuffer.wrap(new byte[]{0x01, 0x02, 0x03, 0x27, 0x50, 0x0, 0x0, 0x0}).getLong() => 72624011372134400 + + String[] inputData = new String[]{"apple\t1\t1","orange\t2\t2","kiwi\t16909095\t72624011372134400","orange\t4\t4"}; + String[] expected = new String[] {"(apple,1,1)","(orange,2,2)","(kiwi,16909095,72624011372134400)","(orange,4,4)"}; + File inputFile = Util.createInputFile("interStorageInput", "", inputData); + inputFile.deleteOnExit(); + + //Without proper random record markers 0x01020327 would be identified as a marker and although no errors are + // thrown the result will contain incorrect schema and values past this number + //ByteBuffer.wrap(new byte[]{0x01, 0x02, 0x03, 0x27, 0x01, 0x0, 0x0, 0x0}).getLong() => 72624010046734336 + + String[] inputData2 = new String[]{"apple\t1\t1","orange\t2\t2","kiwi\t16909095\t72624010046734336","orange\t4\t4"}; + String[] expected2 = new String[] {"(apple,1,1)","(orange,2,2)","(kiwi,16909095,72624010046734336)","(orange,4,4)"}; + File inputFile2 = Util.createInputFile("interStorageInput2", "", inputData2); + inputFile2.deleteOnExit(); + + File binOutputdir = new File("build/test/interStorageTest"); + Util.deleteDirectory(binOutputdir); + + String script = "A = LOAD '"+inputFile.getAbsolutePath()+"' AS (name:chararray, cnt:int, cnt2:long);\n" + + "STORE A INTO '"+binOutputdir.getAbsolutePath()+"' USING org.apache.pig.impl.io.InterStorage();\n" + + "\n" + + "B = LOAD '"+binOutputdir.getAbsolutePath()+"' USING org.apache.pig.impl.io.InterStorage();\n"; + + pigServer.registerQuery(script); + Iterator<Tuple> it = pigServer.openIterator("B"); + Util.checkQueryOutputsAfterSortRecursive(it, expected, + org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("B"))); + + Util.deleteDirectory(binOutputdir); + + pigServer.registerQuery(script.replaceAll(inputFile.getAbsolutePath(), inputFile2.getAbsolutePath())); + it = pigServer.openIterator("B"); + Util.checkQueryOutputsAfterSortRecursive(it, expected2, + org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("B"))); + + } + private void testSerTuple(Tuple t, byte[] expected) throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutput out = new DataOutputStream(baos); Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin2.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java?rev=1802676&r1=1802675&r2=1802676&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestFRJoin2.java (original) +++ pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Sat Jul 22 11:38:47 2017 @@ -414,12 +414,18 @@ public class TestFRJoin2 { pigServer.registerQuery("C = foreach C generate MAX(B.x) as x;"); pigServer.registerQuery("D = join A by x, B by x, C by x using 'repl';"); { - // When the replicated input sizes=(12 + 5) is bigger than - // pig.join.replicated.max.bytes=16, we throw exception + // When the replicated input size is bigger than + // pig.join.replicated.max.bytes, we throw exception + // Expected replicated size below: + // Alias B: sync marker + 2 records (1 tuple type byte + 2 integers (0 or 1)) + // Alias C: sync marker + 1 record (1 tuple type byte + 1 integer (1)) + long expectedReplicateSize = PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT + 2*(1 +1+1) + + PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT + 1*(1 +1); + try { pigServer.getPigContext().getProperties().setProperty( PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES, - String.valueOf(16)); + String.valueOf(expectedReplicateSize-1)); pigServer.openIterator("D"); Assert.fail(); } catch (FrontendException e) { @@ -428,10 +434,10 @@ public class TestFRJoin2 { e.getCause().getCause().getCause().getMessage()); } - // If we increase the size to 17, it should work + // If we increase the max size setting to the expected amount it works pigServer.getPigContext().getProperties().setProperty( PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES, - String.valueOf(17)); + String.valueOf(expectedReplicateSize)); pigServer.openIterator("D"); } }