Author: utkarsh Date: Fri Dec 7 16:55:29 2007 New Revision: 602287 URL: http://svn.apache.org/viewvc?rev=602287&view=rev Log: PIG-44: Added adaptive decision of the number of records to hold in memory before spilling
Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=602287&r1=602286&r2=602287&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Fri Dec 7 16:55:29 2007 @@ -50,3 +50,6 @@ PIG-47: Added methods to DataMap to provide access to its content PIG-12: Added time stamps to log4j messages (phunt via gates). + + PIG-44: Added adaptive decision of the number of records to hold in memory + before spilling (utkarsh) Modified: incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java?rev=602287&r1=602286&r2=602287&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java (original) +++ incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java Fri Dec 7 16:55:29 2007 @@ -40,7 +40,9 @@ boolean finishedAdds = false,wantSorting = false, doneSorting = false, sortInProgress = false, wroteUnsortedFile = false; int trueCount = 0; - + int numRecordsToHoldInMemory = 1000; + + boolean eliminateDuplicates = false; EvalSpec spec = null; @@ -50,7 +52,10 @@ * cause us to switch to disk backed mode */ public static long FREE_MEMORY_TO_MAINTAIN = (long)(MAX_MEMORY*.25); - + /** + * want to hold roughly only 1% of max memory, once we are in a low memory condition + */ + public static long TARGET_IN_MEMORY_SIZE = (long)(0.01 * MAX_MEMORY); public BigDataBag(File tempdir) throws IOException { this.tempdir = tempdir; @@ -61,10 +66,16 @@ long usedMemory = Runtime.getRuntime().totalMemory() - freeMemory; return MAX_MEMORY-usedMemory > memLimit; } - + + private File getTempFile() throws IOException{ + File store = File.createTempFile("bag",".dat",tempdir); + store.deleteOnExit(); + return store; + } + private void writeContentToDisk() throws IOException{ if (writer==null){ - File store = File.createTempFile("bag",".dat",tempdir); + File store = getTempFile(); stores.add(store); writer = new DataBagFileWriter(store); } @@ -76,7 +87,10 @@ }else{ wroteUnsortedFile = true; } - writer.write(content.iterator()); + long bytesWritten = writer.write(content.iterator()); + //Adjust the number of records to hold in memory so that what + //we hold in memory is about TARGET_IN_MEMORY_SIZE + numRecordsToHoldInMemory = (int)((numRecordsToHoldInMemory * TARGET_IN_MEMORY_SIZE) / bytesWritten); super.clear(); if (wantSorting){ writer.close(); @@ -95,7 +109,7 @@ if (writer == null) { //Want to add in memory super.add(t); - if (!isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN) && trueCount > 10) { + if (!isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN) && content.size() > numRecordsToHoldInMemory) { writeContentToDisk(); } }else{ @@ -133,7 +147,7 @@ Iterator<Tuple> iter = reader.content(); while(iter.hasNext()){ DataBag bag = new DataBag(); - while( iter.hasNext() && isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN/2)){ + while( iter.hasNext() && (isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN/2) || bag.cardinality() < numRecordsToHoldInMemory)){ bag.add(iter.next()); } if(eliminateDuplicates){ @@ -141,7 +155,7 @@ trueCount = bag.cardinality(); }else bag.sort(spec); - File f = File.createTempFile("bag", ".dat",tempdir); + File f = getTempFile(); stores.add(f); DataBagFileWriter writer = new DataBagFileWriter(f); writer.write(bag.content()); @@ -306,7 +320,7 @@ } } - File outputFile = File.createTempFile("bag",".dat",tempdir); + File outputFile = getTempFile(); stores.add(outputFile); writer = new DataBagFileWriter(outputFile); Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java?rev=602287&r1=602286&r2=602287&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java Fri Dec 7 16:55:29 2007 @@ -41,10 +41,20 @@ t.write(out); } - public void write(Iterator<Tuple> iter) throws IOException{ + public long write(Iterator<Tuple> iter) throws IOException{ + + long initialSize = getFileLength(); while (iter.hasNext()) - iter.next().write(out); + iter.next().write(out); + + return getFileLength() - initialSize; } + + public long getFileLength() throws IOException{ + out.flush(); + return store.length(); + } + public void close() throws IOException{ flush(); Modified: incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java?rev=602287&r1=602286&r2=602287&view=diff ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java (original) +++ incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Fri Dec 7 16:55:29 2007 @@ -19,7 +19,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; @@ -216,176 +215,78 @@ public void testBigDataBagOnDisk() throws Exception{ Runtime.getRuntime().gc(); - testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 10000); + testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 1000000); } + private enum TestType { + PRE_SORT, + POST_SORT, + PRE_DISTINCT, + POST_DISTINCT, + NONE + } + private void testBigDataBag(long freeMemoryToMaintain, int numItems) throws Exception { BigDataBag.FREE_MEMORY_TO_MAINTAIN = freeMemoryToMaintain; - File tmp = File.createTempFile("test", "bag").getParentFile(); - BigDataBag bag = new BigDataBag(tmp); - Iterator<Tuple> it; - int count; - String last; - Random r = new Random(); - - - //Basic test - assertTrue(bag.isEmpty()); - - for(int i = 0; i < numItems; i++) { - Tuple t = new Tuple(2); - t.setField(0, Integer.toHexString(i)); - t.setField(1, i); - bag.add(t); - } - - assertFalse(bag.isEmpty()); - - assertTrue(bag.cardinality() == numItems); - - int lastI = -1; - it = bag.content(); - count = 0; - while(it.hasNext()) { - Tuple t = it.next(); - int ix = Integer.parseInt(t.getAtomField(0).strval(), 16); - assertTrue(Integer.toString(ix).equals(t.getAtomField(1).strval())); - assertEquals(lastI+1, ix); - lastI = ix; - count++; - } - - assertTrue(bag.cardinality() == count); - - bag.distinct(); - - bag.clear(); + + for (TestType testType: TestType.values()){ + BigDataBag bag = BagFactory.getInstance().getNewBigBag(); + + assertTrue(bag.isEmpty()); + + if (testType == TestType.PRE_SORT) + bag.sort(); + else if (testType == TestType.PRE_DISTINCT) + bag.distinct(); + + //generate data and add it to the bag + for(int i = 0; i < numItems; i++) { + Tuple t = new Tuple(1); + t.setField(0, r.nextInt(numItems)); + bag.add(t); + } + + assertFalse(bag.isEmpty()); + + if (testType == TestType.POST_SORT) + bag.sort(); + else if (testType == TestType.POST_DISTINCT) + bag.distinct(); + + + if (testType == TestType.NONE) + assertTrue(bag.cardinality() == numItems); + checkContents(bag, numItems, testType); + checkContents(bag, numItems, testType); - //Test pre sort - - bag.sort(); - - - for(int i = 0; i < numItems; i++) { - Tuple t = new Tuple(1); - t.setField(0, r.nextInt(100000)); - bag.add(t); - } - - it = bag.content(); - count = 0; - last= ""; - while(it.hasNext()) { - Tuple t = it.next(); - String next = t.getAtomField(0).strval(); - assertTrue(last.compareTo(next)<=0); - last = next; - count++; - } - - assertTrue(bag.cardinality() == count); - - bag.clear(); - - - //Test post sort - - for(int i = 0; i < numItems; i++) { - Tuple t = new Tuple(1); - t.setField(0, r.nextInt(100000)); - bag.add(t); - } + } + } + + + private void checkContents(DataBag bag, int numItems, TestType testType) throws Exception{ + String last = ""; - bag.sort(); DataBag.notifyInterval = 100; - it = bag.content(); - count = 0; - last= ""; - while(it.hasNext()) { - Tuple t = it.next(); - String next = t.getAtomField(0).strval(); - assertTrue(last.compareTo(next)<=0); - last = next; - count++; - } - - assertTrue(bag.cardinality() == count); - int cnt = numItems/DataBag.notifyInterval; - assertTrue(bag.numNotifies >= cnt); - - bag.clear(); - - //test post-distinct - - - for(int i = 0; i < numItems; i++) { - Tuple t = new Tuple(1); - //To get a lot of duplicates - t.setField(0, r.nextInt(1000)); - bag.add(t); - } - - bag.distinct(); - - it = bag.content(); - count = 0; - last= ""; + Iterator<Tuple> it = bag.content(); + int count = 0; while(it.hasNext()) { - Tuple t = it.next(); - String next = t.getAtomField(0).strval(); - assertTrue(last.compareTo(next)<0); + Tuple t = it.next(); + String next = t.getAtomField(0).strval(); + if (testType == TestType.POST_SORT || testType == TestType.PRE_SORT) + assertTrue(last.compareTo(next)<=0); + else if (testType == TestType.POST_DISTINCT || testType == TestType.PRE_DISTINCT) + assertTrue(last.compareTo(next)<0); last = next; - count++; - } - - assertTrue(bag.cardinality() == count); - - bag.clear(); - - - //Test pre distinct - - bag.distinct(); - - - for(int i = 0; i < numItems; i++) { - Tuple t = new Tuple(1); - //To get a lot of duplicates - t.setField(0, r.nextInt(10)); - bag.add(t); + count++; } - - it = bag.content(); - count = 0; - last= ""; - while(it.hasNext()) { - Tuple t = it.next(); - String next = t.getAtomField(0).strval(); - assertTrue(last.compareTo(next)<0); - last = next; - count++; - } - - assertTrue(bag.cardinality() == count); - - //Check if it gives the correct contents the second time around - it = bag.content(); - count = 0; - last= ""; - while(it.hasNext()) { - Tuple t = it.next(); - String next = t.getAtomField(0).strval(); - assertTrue(last.compareTo(next)<0); - last = next; - count++; - } - assertTrue(bag.cardinality() == count); - bag.clear(); + if (testType != TestType.NONE) + assertTrue(bag.numNotifies >= count/DataBag.notifyInterval); } + }